nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17//!
18//! This module provides the execution client for submitting orders, cancellations,
19//! and managing positions on dYdX v4.
20//!
21//! # Order Types
22//!
23//! dYdX supports the following order types:
24//!
25//! - **Market**: Execute immediately at best available price
26//! - **Limit**: Execute at specified price or better
27//! - **Stop Market**: Triggered when price crosses stop price, then executes as market order
28//! - **Stop Limit**: Triggered when price crosses stop price, then places limit order
29//! - **Take Profit Market**: Close position at profit target, executes as market order
30//! - **Take Profit Limit**: Close position at profit target, places limit order
31//!
32//! See <https://docs.dydx.xyz/concepts/trading/orders#types> for details.
33//!
34//! # Order Lifetimes
35//!
36//! Orders can be short-term (expire by block height) or long-term/stateful (expire by timestamp).
37//! Conditional orders (Stop/TakeProfit) are always stateful.
38//!
39//! See <https://docs.dydx.xyz/concepts/trading/orders#short-term-vs-long-term> for details.
40
41use std::sync::{
42    Arc, Mutex,
43    atomic::{AtomicU32, AtomicU64, Ordering},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use nautilus_common::{
50    live::{runner::get_exec_event_sender, runtime::get_runtime},
51    messages::{
52        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
53        execution::{
54            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
55            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
56            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
57        },
58    },
59    msgbus,
60};
61use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
62use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
63use nautilus_model::{
64    accounts::AccountAny,
65    enums::{OmsType, OrderSide, OrderType, TimeInForce},
66    events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
67    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
68    instruments::{Instrument, InstrumentAny},
69    orders::Order,
70    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
71    types::{AccountBalance, MarginBalance},
72};
73use nautilus_network::retry::RetryConfig;
74use rust_decimal::Decimal;
75use tokio::task::JoinHandle;
76
77use crate::{
78    common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
79    config::DydxAdapterConfig,
80    execution::submitter::OrderSubmitter,
81    grpc::{DydxGrpcClient, Wallet, types::ChainId},
82    http::client::DydxHttpClient,
83    websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
84};
85
86pub mod submitter;
87
88/// Maximum client order ID value for dYdX (informational - not enforced by adapter).
89///
90/// dYdX protocol accepts u32 client IDs. The current implementation uses sequential
91/// allocation starting from 1, which will wrap at u32::MAX. If dYdX has a stricter
92/// limit, this constant should be updated and enforced in `generate_client_order_id_int`.
93pub const MAX_CLIENT_ID: u32 = u32::MAX;
94
95/// Execution report types dispatched from WebSocket message handler.
96///
97/// This enum groups order and fill reports for unified dispatch handling,
98/// following the pattern used by reference adapters (Hyperliquid, OKX).
99enum ExecutionReport {
100    Order(Box<OrderStatusReport>),
101    Fill(Box<FillReport>),
102}
103
104/// Live execution client for the dYdX v4 exchange adapter.
105///
106/// Supports Market, Limit, Stop Market, Stop Limit, Take Profit Market (MarketIfTouched),
107/// and Take Profit Limit (LimitIfTouched) orders via gRPC. Trailing stops are NOT supported
108/// by the dYdX v4 protocol. dYdX requires u32 client IDs - strings are hashed to fit.
109///
110/// # Architecture
111///
112/// The client follows a two-layer execution model:
113/// 1. **Synchronous validation** - Immediate checks and event generation
114/// 2. **Async submission** - Non-blocking gRPC calls via `OrderSubmitter`
115///
116/// This matches the pattern used in OKX and other exchange adapters, ensuring
117/// consistent behavior across the Nautilus ecosystem.
118#[derive(Debug)]
119pub struct DydxExecutionClient {
120    core: ExecutionClientCore,
121    config: DydxAdapterConfig,
122    http_client: DydxHttpClient,
123    ws_client: DydxWebSocketClient,
124    grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
125    wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
126    instruments: DashMap<InstrumentId, InstrumentAny>,
127    market_to_instrument: DashMap<String, InstrumentId>,
128    clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
129    block_height: Arc<AtomicU64>,
130    oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
131    client_id_to_int: DashMap<String, u32>,
132    int_to_client_id: DashMap<u32, String>,
133    next_client_id: AtomicU32,
134    wallet_address: String,
135    subaccount_number: u32,
136    started: bool,
137    connected: bool,
138    instruments_initialized: bool,
139    ws_stream_handle: Option<JoinHandle<()>>,
140    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
141}
142
143impl DydxExecutionClient {
144    /// Creates a new [`DydxExecutionClient`].
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the WebSocket client fails to construct.
149    pub fn new(
150        core: ExecutionClientCore,
151        config: DydxAdapterConfig,
152        wallet_address: String,
153        subaccount_number: u32,
154    ) -> anyhow::Result<Self> {
155        // Build HTTP client from config (respects testnet URLs, timeouts, retries)
156        let retry_config = RetryConfig {
157            max_retries: config.max_retries,
158            initial_delay_ms: config.retry_delay_initial_ms,
159            max_delay_ms: config.retry_delay_max_ms,
160            ..Default::default()
161        };
162        let http_client = DydxHttpClient::new(
163            Some(config.base_url.clone()),
164            Some(config.timeout_secs),
165            None, // proxy_url - not in DydxAdapterConfig currently
166            config.is_testnet,
167            Some(retry_config),
168        )?;
169
170        // Use private WebSocket client for authenticated subaccount subscriptions
171        let ws_client = if let Some(ref mnemonic) = config.mnemonic {
172            let credential = DydxCredential::from_mnemonic(
173                mnemonic,
174                subaccount_number,
175                config.authenticator_ids.clone(),
176            )?;
177            DydxWebSocketClient::new_private(
178                config.ws_url.clone(),
179                credential,
180                core.account_id,
181                Some(20),
182            )
183        } else {
184            DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
185        };
186
187        let grpc_urls = config.get_grpc_urls();
188        let grpc_client = Arc::new(tokio::sync::RwLock::new(
189            get_runtime()
190                .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
191                .context("failed to construct dYdX gRPC client")?,
192        ));
193
194        Ok(Self {
195            core,
196            config,
197            http_client,
198            ws_client,
199            grpc_client,
200            wallet: Arc::new(tokio::sync::RwLock::new(None)),
201            instruments: DashMap::new(),
202            market_to_instrument: DashMap::new(),
203            clob_pair_id_to_instrument: DashMap::new(),
204            block_height: Arc::new(AtomicU64::new(0)),
205            oracle_prices: Arc::new(DashMap::new()),
206            client_id_to_int: DashMap::new(),
207            int_to_client_id: DashMap::new(),
208            next_client_id: AtomicU32::new(1),
209            wallet_address,
210            subaccount_number,
211            started: false,
212            connected: false,
213            instruments_initialized: false,
214            ws_stream_handle: None,
215            pending_tasks: Mutex::new(Vec::new()),
216        })
217    }
218
219    /// Generate a unique client order ID integer and store the mapping.
220    ///
221    /// # Invariants
222    ///
223    /// - Same `client_order_id` string → same `u32` for the lifetime of this process
224    /// - Different `client_order_id` strings → different `u32` values (except on u32 wrap)
225    /// - Thread-safe for concurrent calls
226    ///
227    /// # Behavior
228    ///
229    /// - Parses numeric `client_order_id` directly to `u32` for stability across restarts
230    /// - For non-numeric IDs, allocates a new sequential value from an atomic counter
231    /// - Mapping is kept in-memory only; non-numeric IDs will not be recoverable after restart
232    /// - Counter starts at 1 and increments without bound checking (will wrap at u32::MAX)
233    ///
234    /// # Notes
235    ///
236    /// - Atomic counter uses `Relaxed` ordering — uniqueness is required, not cross-thread sequencing
237    /// - If dYdX enforces a maximum client ID below u32::MAX, additional range validation is needed
238    fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
239        // Fast path: already mapped
240        if let Some(existing) = self.client_id_to_int.get(client_order_id) {
241            return *existing.value();
242        }
243
244        // Try parsing as direct integer
245        if let Ok(id) = client_order_id.parse::<u32>() {
246            self.client_id_to_int
247                .insert(client_order_id.to_string(), id);
248            self.int_to_client_id
249                .insert(id, client_order_id.to_string());
250            return id;
251        }
252
253        // Allocate new ID from atomic counter
254        use dashmap::mapref::entry::Entry;
255
256        match self.client_id_to_int.entry(client_order_id.to_string()) {
257            Entry::Occupied(entry) => *entry.get(),
258            Entry::Vacant(vacant) => {
259                let id = self
260                    .next_client_id
261                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
262                vacant.insert(id);
263                self.int_to_client_id
264                    .insert(id, client_order_id.to_string());
265                id
266            }
267        }
268    }
269
270    /// Retrieve the client order ID integer from the cache.
271    ///
272    /// Returns `None` if the mapping doesn't exist.
273    fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
274        // Try parsing first
275        if let Ok(id) = client_order_id.parse::<u32>() {
276            return Some(id);
277        }
278
279        // Look up in cache
280        self.client_id_to_int
281            .get(client_order_id)
282            .map(|entry| *entry.value())
283    }
284
285    /// Get chain ID from config network field.
286    ///
287    /// This is the recommended way to get chain_id for all transaction submissions.
288    fn get_chain_id(&self) -> ChainId {
289        self.config.get_chain_id()
290    }
291
292    /// Cache instruments from HTTP client into execution client's lookup maps.
293    ///
294    /// This populates three data structures for efficient lookups:
295    /// - `instruments`: InstrumentId → InstrumentAny
296    /// - `market_to_instrument`: Market ticker (e.g., "BTC-USD") → InstrumentId
297    /// - `clob_pair_id_to_instrument`: CLOB pair ID → InstrumentId
298    fn cache_instruments_from_http(&mut self) {
299        use nautilus_model::instruments::InstrumentAny;
300
301        // Get all instruments from HTTP client cache
302        let instruments: Vec<InstrumentAny> = self
303            .http_client
304            .instruments_cache
305            .iter()
306            .map(|entry| entry.value().clone())
307            .collect();
308
309        tracing::debug!(
310            "Caching {} instruments in execution client",
311            instruments.len()
312        );
313
314        for instrument in instruments {
315            let instrument_id = instrument.id();
316            let symbol = instrument_id.symbol.as_str();
317
318            // Cache instrument by InstrumentId
319            self.instruments.insert(instrument_id, instrument.clone());
320
321            // Cache market ticker → InstrumentId mapping
322            self.market_to_instrument
323                .insert(symbol.to_string(), instrument_id);
324        }
325
326        // Copy clob_pair_id → InstrumentId mapping from HTTP client
327        // The HTTP client populates this from PerpetualMarket.clob_pair_id (authoritative source)
328        let http_mapping = self.http_client.clob_pair_id_mapping();
329        for entry in http_mapping.iter() {
330            self.clob_pair_id_to_instrument
331                .insert(*entry.key(), *entry.value());
332        }
333
334        self.instruments_initialized = true;
335        tracing::info!(
336            "Cached {} instruments ({} CLOB pair IDs) with market mappings",
337            self.instruments.len(),
338            self.clob_pair_id_to_instrument.len()
339        );
340    }
341
342    /// Get an instrument by market ticker (e.g., "BTC-USD").
343    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
344        self.market_to_instrument
345            .get(market)
346            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
347    }
348
349    /// Get an instrument by clob_pair_id.
350    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
351        let instrument = self
352            .clob_pair_id_to_instrument
353            .get(&clob_pair_id)
354            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
355
356        if instrument.is_none() {
357            self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
358        }
359
360        instrument
361    }
362
363    fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
364        let known: Vec<(u32, String)> = self
365            .clob_pair_id_to_instrument
366            .iter()
367            .filter_map(|entry| {
368                let instrument_id = entry.value();
369                self.instruments.get(instrument_id).map(|inst_entry| {
370                    (
371                        *entry.key(),
372                        inst_entry.value().id().symbol.as_str().to_string(),
373                    )
374                })
375            })
376            .collect();
377
378        tracing::warn!(
379            "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
380            clob_pair_id,
381            known
382        );
383    }
384
385    fn spawn_task<F>(&self, label: &'static str, fut: F)
386    where
387        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
388    {
389        let handle = get_runtime().spawn(async move {
390            if let Err(e) = fut.await {
391                tracing::error!("{label}: {e:?}");
392            }
393        });
394
395        self.pending_tasks
396            .lock()
397            .expect(MUTEX_POISONED)
398            .push(handle);
399    }
400
401    /// Spawns an order submission task with error handling and rejection generation.
402    ///
403    /// If the submission fails, generates an `OrderRejected` event with the error details.
404    fn spawn_order_task<F>(
405        &self,
406        label: &'static str,
407        strategy_id: StrategyId,
408        instrument_id: InstrumentId,
409        client_order_id: ClientOrderId,
410        fut: F,
411    ) where
412        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
413    {
414        // Capture necessary data for rejection event
415        let trader_id = self.core.trader_id;
416        let account_id = self.core.account_id;
417        let sender = get_exec_event_sender();
418
419        let handle = get_runtime().spawn(async move {
420            if let Err(e) = fut.await {
421                let error_msg = format!("{label} failed: {e:?}");
422                tracing::error!("{}", error_msg);
423
424                let ts_now = UnixNanos::default(); // Use current time
425                let event = OrderRejected::new(
426                    trader_id,
427                    strategy_id,
428                    instrument_id,
429                    client_order_id,
430                    account_id,
431                    error_msg.into(),
432                    UUID4::new(),
433                    ts_now,
434                    ts_now,
435                    false,
436                    false,
437                );
438
439                if let Err(send_err) =
440                    sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
441                {
442                    tracing::error!("Failed to send OrderRejected event: {send_err}");
443                }
444            }
445        });
446
447        self.pending_tasks
448            .lock()
449            .expect(MUTEX_POISONED)
450            .push(handle);
451    }
452
453    fn abort_pending_tasks(&self) {
454        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
455        for handle in guard.drain(..) {
456            handle.abort();
457        }
458    }
459}
460
461#[async_trait(?Send)]
462impl ExecutionClient for DydxExecutionClient {
463    fn is_connected(&self) -> bool {
464        self.connected
465    }
466
467    fn client_id(&self) -> ClientId {
468        self.core.client_id
469    }
470
471    fn account_id(&self) -> AccountId {
472        self.core.account_id
473    }
474
475    fn venue(&self) -> Venue {
476        *DYDX_VENUE
477    }
478
479    fn oms_type(&self) -> OmsType {
480        self.core.oms_type
481    }
482
483    fn get_account(&self) -> Option<AccountAny> {
484        self.core.get_account()
485    }
486
487    fn generate_account_state(
488        &self,
489        balances: Vec<AccountBalance>,
490        margins: Vec<MarginBalance>,
491        reported: bool,
492        ts_event: UnixNanos,
493    ) -> anyhow::Result<()> {
494        self.core
495            .generate_account_state(balances, margins, reported, ts_event)
496    }
497
498    fn start(&mut self) -> anyhow::Result<()> {
499        if self.started {
500            tracing::warn!("dYdX execution client already started");
501            return Ok(());
502        }
503
504        tracing::info!("Starting dYdX execution client");
505        self.started = true;
506        Ok(())
507    }
508
509    fn stop(&mut self) -> anyhow::Result<()> {
510        if !self.started {
511            tracing::warn!("dYdX execution client not started");
512            return Ok(());
513        }
514
515        tracing::info!("Stopping dYdX execution client");
516        self.abort_pending_tasks();
517        self.started = false;
518        self.connected = false;
519        Ok(())
520    }
521
522    /// Submits an order to dYdX via gRPC.
523    ///
524    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
525    ///
526    /// Supported order types:
527    /// - Market orders (short-term, IOC)
528    /// - Limit orders (short-term or long-term based on TIF)
529    /// - Stop Market orders (conditional, triggered at stop price)
530    /// - Stop Limit orders (conditional, triggered at stop price, executed at limit)
531    /// - Take Profit Market (MarketIfTouched - triggered at take profit price)
532    /// - Take Profit Limit (LimitIfTouched - triggered at take profit price, executed at limit)
533    ///
534    /// Trailing stop orders are NOT supported by dYdX v4 protocol.
535    ///
536    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
537    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
538    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
539        let order = cmd.order.clone();
540
541        // Check connection status
542        if !self.is_connected() {
543            let reason = "Cannot submit order: execution client not connected";
544            tracing::error!("{}", reason);
545            anyhow::bail!(reason);
546        }
547
548        // Check block height is available for short-term orders
549        let current_block = self.block_height.load(Ordering::Relaxed);
550        if current_block == 0 {
551            let reason = "Block height not initialized";
552            tracing::warn!(
553                "Cannot submit order {}: {}",
554                order.client_order_id(),
555                reason
556            );
557            self.core.generate_order_rejected(
558                order.strategy_id(),
559                order.instrument_id(),
560                order.client_order_id(),
561                reason,
562                cmd.ts_init,
563                false,
564            );
565            return Ok(());
566        }
567
568        // Check if order is already closed
569        if order.is_closed() {
570            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
571            return Ok(());
572        }
573
574        // Validate order type
575        match order.order_type() {
576            OrderType::Market | OrderType::Limit => {
577                tracing::debug!(
578                    "Submitting {} order: {}",
579                    if matches!(order.order_type(), OrderType::Market) {
580                        "MARKET"
581                    } else {
582                        "LIMIT"
583                    },
584                    order.client_order_id()
585                );
586            }
587            // Conditional orders (stop/take-profit) - supported by dYdX
588            OrderType::StopMarket | OrderType::StopLimit => {
589                tracing::debug!(
590                    "Submitting {} order: {}",
591                    if matches!(order.order_type(), OrderType::StopMarket) {
592                        "STOP_MARKET"
593                    } else {
594                        "STOP_LIMIT"
595                    },
596                    order.client_order_id()
597                );
598            }
599            // dYdX TakeProfit/TakeProfitLimit map to MarketIfTouched/LimitIfTouched
600            OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
601                tracing::debug!(
602                    "Submitting {} order: {}",
603                    if matches!(order.order_type(), OrderType::MarketIfTouched) {
604                        "TAKE_PROFIT_MARKET"
605                    } else {
606                        "TAKE_PROFIT_LIMIT"
607                    },
608                    order.client_order_id()
609                );
610            }
611            // Trailing stops not supported by dYdX v4 protocol
612            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
613                let reason = "Trailing stop orders not supported by dYdX v4 protocol";
614                tracing::error!("{}", reason);
615                self.core.generate_order_rejected(
616                    order.strategy_id(),
617                    order.instrument_id(),
618                    order.client_order_id(),
619                    reason,
620                    cmd.ts_init,
621                    false,
622                );
623                return Ok(());
624            }
625            order_type => {
626                let reason = format!("Order type {order_type:?} not supported by dYdX");
627                tracing::error!("{}", reason);
628                self.core.generate_order_rejected(
629                    order.strategy_id(),
630                    order.instrument_id(),
631                    order.client_order_id(),
632                    &reason,
633                    cmd.ts_init,
634                    false,
635                );
636                return Ok(());
637            }
638        }
639
640        // Generate OrderSubmitted event immediately
641        self.core.generate_order_submitted(
642            order.strategy_id(),
643            order.instrument_id(),
644            order.client_order_id(),
645            cmd.ts_init,
646        );
647
648        let grpc_client = self.grpc_client.clone();
649        let wallet = self.wallet.clone();
650        let http_client = self.http_client.clone();
651        let wallet_address = self.wallet_address.clone();
652        let subaccount_number = self.subaccount_number;
653        let client_order_id = order.client_order_id();
654        let instrument_id = order.instrument_id();
655        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
656        let chain_id = self.get_chain_id();
657        let authenticator_ids = self.config.authenticator_ids.clone();
658        #[allow(clippy::redundant_clone)]
659        let order_clone = order.clone();
660
661        // Generate client_order_id as u32 before async block (dYdX requires u32 client IDs)
662        let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
663
664        self.spawn_order_task(
665            "submit_order",
666            order.strategy_id(),
667            order.instrument_id(),
668            order.client_order_id(),
669            async move {
670                let wallet_guard = wallet.read().await;
671                let wallet_ref = wallet_guard
672                    .as_ref()
673                    .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
674
675                let grpc_guard = grpc_client.read().await;
676                let submitter = OrderSubmitter::new(
677                    (*grpc_guard).clone(),
678                    http_client.clone(),
679                    wallet_address,
680                    subaccount_number,
681                    chain_id,
682                    authenticator_ids,
683                );
684
685                // Submit order based on type
686                match order_clone.order_type() {
687                    OrderType::Market => {
688                        submitter
689                            .submit_market_order(
690                                wallet_ref,
691                                instrument_id,
692                                client_id_u32,
693                                order_clone.order_side(),
694                                order_clone.quantity(),
695                                block_height,
696                            )
697                            .await?;
698                        tracing::info!("Successfully submitted market order: {}", client_order_id);
699                    }
700                    OrderType::Limit => {
701                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
702                        submitter
703                            .submit_limit_order(
704                                wallet_ref,
705                                instrument_id,
706                                client_id_u32,
707                                order_clone.order_side(),
708                                order_clone
709                                    .price()
710                                    .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
711                                order_clone.quantity(),
712                                order_clone.time_in_force(),
713                                order_clone.is_post_only(),
714                                order_clone.is_reduce_only(),
715                                block_height,
716                                expire_time,
717                            )
718                            .await?;
719                        tracing::info!("Successfully submitted limit order: {}", client_order_id);
720                    }
721                    OrderType::StopMarket => {
722                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
723                            anyhow::anyhow!("Stop market order missing trigger_price")
724                        })?;
725                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
726                        submitter
727                            .submit_stop_market_order(
728                                wallet_ref,
729                                instrument_id,
730                                client_id_u32,
731                                order_clone.order_side(),
732                                trigger_price,
733                                order_clone.quantity(),
734                                order_clone.is_reduce_only(),
735                                expire_time,
736                            )
737                            .await?;
738                        tracing::info!(
739                            "Successfully submitted stop market order: {}",
740                            client_order_id
741                        );
742                    }
743                    OrderType::StopLimit => {
744                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
745                            anyhow::anyhow!("Stop limit order missing trigger_price")
746                        })?;
747                        let limit_price = order_clone.price().ok_or_else(|| {
748                            anyhow::anyhow!("Stop limit order missing limit price")
749                        })?;
750                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
751                        submitter
752                            .submit_stop_limit_order(
753                                wallet_ref,
754                                instrument_id,
755                                client_id_u32,
756                                order_clone.order_side(),
757                                trigger_price,
758                                limit_price,
759                                order_clone.quantity(),
760                                order_clone.time_in_force(),
761                                order_clone.is_post_only(),
762                                order_clone.is_reduce_only(),
763                                expire_time,
764                            )
765                            .await?;
766                        tracing::info!(
767                            "Successfully submitted stop limit order: {}",
768                            client_order_id
769                        );
770                    }
771                    // dYdX TakeProfitMarket maps to Nautilus MarketIfTouched
772                    OrderType::MarketIfTouched => {
773                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
774                            anyhow::anyhow!("Take profit market order missing trigger_price")
775                        })?;
776                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
777                        submitter
778                            .submit_take_profit_market_order(
779                                wallet_ref,
780                                instrument_id,
781                                client_id_u32,
782                                order_clone.order_side(),
783                                trigger_price,
784                                order_clone.quantity(),
785                                order_clone.is_reduce_only(),
786                                expire_time,
787                            )
788                            .await?;
789                        tracing::info!(
790                            "Successfully submitted take profit market order: {}",
791                            client_order_id
792                        );
793                    }
794                    // dYdX TakeProfitLimit maps to Nautilus LimitIfTouched
795                    OrderType::LimitIfTouched => {
796                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
797                            anyhow::anyhow!("Take profit limit order missing trigger_price")
798                        })?;
799                        let limit_price = order_clone.price().ok_or_else(|| {
800                            anyhow::anyhow!("Take profit limit order missing limit price")
801                        })?;
802                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
803                        submitter
804                            .submit_take_profit_limit_order(
805                                wallet_ref,
806                                instrument_id,
807                                client_id_u32,
808                                order_clone.order_side(),
809                                trigger_price,
810                                limit_price,
811                                order_clone.quantity(),
812                                order_clone.time_in_force(),
813                                order_clone.is_post_only(),
814                                order_clone.is_reduce_only(),
815                                expire_time,
816                            )
817                            .await?;
818                        tracing::info!(
819                            "Successfully submitted take profit limit order: {}",
820                            client_order_id
821                        );
822                    }
823                    _ => unreachable!("Order type already validated"),
824                }
825
826                Ok(())
827            },
828        );
829
830        Ok(())
831    }
832
833    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
834        anyhow::bail!("Order lists not supported by dYdX")
835    }
836
837    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
838        anyhow::bail!("Order modification not supported by dYdX")
839    }
840
841    /// Cancels an order on dYdX exchange.
842    ///
843    /// Validates the order state and retrieves instrument details before
844    /// spawning an async task to cancel via gRPC.
845    ///
846    /// # Validation
847    /// - Checks order exists in cache
848    /// - Validates order is not already closed
849    /// - Retrieves instrument from cache for order builder
850    ///
851    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
852    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
853    ///
854    /// # Events
855    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation
856    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation
857    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
858        if !self.is_connected() {
859            anyhow::bail!("Cannot cancel order: not connected");
860        }
861
862        let client_order_id = cmd.client_order_id;
863
864        // Validate order exists in cache and is not closed
865        let cache = self.core.cache();
866        let cache_borrow = cache.borrow();
867
868        let order = match cache_borrow.order(&client_order_id) {
869            Some(order) => order,
870            None => {
871                tracing::error!(
872                    "Cannot cancel order {}: not found in cache",
873                    client_order_id
874                );
875                return Ok(()); // Not an error - order may have been filled/canceled already
876            }
877        };
878
879        // Validate order is not already closed
880        if order.is_closed() {
881            tracing::warn!(
882                "CancelOrder command for {} when order already {} (will not send to exchange)",
883                client_order_id,
884                order.status()
885            );
886            return Ok(());
887        }
888
889        // Retrieve instrument from cache
890        let instrument_id = cmd.instrument_id;
891        let instrument = match cache_borrow.instrument(&instrument_id) {
892            Some(instrument) => instrument,
893            None => {
894                tracing::error!(
895                    "Cannot cancel order {}: instrument {} not found in cache",
896                    client_order_id,
897                    instrument_id
898                );
899                return Ok(()); // Not an error - missing instrument is a cache issue
900            }
901        };
902
903        tracing::debug!(
904            "Cancelling order {} for instrument {}",
905            client_order_id,
906            instrument.id()
907        );
908
909        let grpc_client = self.grpc_client.clone();
910        let wallet = self.wallet.clone();
911        let http_client = self.http_client.clone();
912        let wallet_address = self.wallet_address.clone();
913        let subaccount_number = self.subaccount_number;
914        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
915        let chain_id = self.get_chain_id();
916        let authenticator_ids = self.config.authenticator_ids.clone();
917        let trader_id = cmd.trader_id;
918        let strategy_id = cmd.strategy_id;
919        let venue_order_id = cmd.venue_order_id;
920
921        // Convert client_order_id to u32 before async block
922        let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
923            Some(id) => id,
924            None => {
925                tracing::error!("Client order ID {} not found in cache", client_order_id);
926                anyhow::bail!("Client order ID not found in cache")
927            }
928        };
929
930        self.spawn_task("cancel_order", async move {
931            let wallet_guard = wallet.read().await;
932            let wallet_ref = wallet_guard
933                .as_ref()
934                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
935
936            let grpc_guard = grpc_client.read().await;
937            let submitter = OrderSubmitter::new(
938                (*grpc_guard).clone(),
939                http_client.clone(),
940                wallet_address,
941                subaccount_number,
942                chain_id,
943                authenticator_ids,
944            );
945
946            // Attempt cancellation via submitter
947            match submitter
948                .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
949                .await
950            {
951                Ok(_) => {
952                    tracing::info!("Successfully cancelled order: {}", client_order_id);
953                }
954                Err(e) => {
955                    tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
956
957                    let sender = get_exec_event_sender();
958                    let ts_now = UnixNanos::default();
959                    let event = OrderCancelRejected::new(
960                        trader_id,
961                        strategy_id,
962                        instrument_id,
963                        client_order_id,
964                        format!("Cancel order failed: {e:?}").into(),
965                        UUID4::new(),
966                        ts_now,
967                        ts_now,
968                        false,
969                        venue_order_id,
970                        None, // account_id not available in async context
971                    );
972                    sender
973                        .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
974                        .unwrap();
975                }
976            }
977
978            Ok(())
979        });
980
981        Ok(())
982    }
983
984    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
985        if !self.is_connected() {
986            anyhow::bail!("Cannot cancel orders: not connected");
987        }
988
989        // Query all open orders from cache
990        let cache = self.core.cache().borrow();
991        let mut open_orders: Vec<_> = cache
992            .orders_open(None, None, None, None)
993            .into_iter()
994            .collect();
995
996        let instrument_id = cmd.instrument_id;
997        open_orders.retain(|order| order.instrument_id() == instrument_id);
998
999        // Filter by order_side if specified (NoOrderSide means all sides)
1000        if cmd.order_side != OrderSide::NoOrderSide {
1001            let order_side = cmd.order_side;
1002            open_orders.retain(|order| order.order_side() == order_side);
1003        }
1004
1005        // Split orders into short-term and long-term based on TimeInForce
1006        // Short-term: IOC, FOK (expire by block height)
1007        // Long-term: GTC, GTD, DAY, POST_ONLY (expire by timestamp)
1008        let mut short_term_orders = Vec::new();
1009        let mut long_term_orders = Vec::new();
1010
1011        for order in &open_orders {
1012            match order.time_in_force() {
1013                TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
1014                TimeInForce::Gtc
1015                | TimeInForce::Gtd
1016                | TimeInForce::Day
1017                | TimeInForce::AtTheOpen
1018                | TimeInForce::AtTheClose => long_term_orders.push(order),
1019            }
1020        }
1021
1022        tracing::info!(
1023            "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
1024            open_orders.len(),
1025            short_term_orders.len(),
1026            long_term_orders.len(),
1027            instrument_id,
1028            cmd.order_side
1029        );
1030
1031        // Cancel each order individually (dYdX requires separate transactions)
1032        let grpc_client = self.grpc_client.clone();
1033        let wallet = self.wallet.clone();
1034        let http_client = self.http_client.clone();
1035        let wallet_address = self.wallet_address.clone();
1036        let subaccount_number = self.subaccount_number;
1037        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1038        let chain_id = self.get_chain_id();
1039        let authenticator_ids = self.config.authenticator_ids.clone();
1040
1041        // Collect (instrument_id, client_id) tuples for batch cancel
1042        let mut orders_to_cancel = Vec::new();
1043        for order in &open_orders {
1044            let client_order_id = order.client_order_id();
1045            if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1046                orders_to_cancel.push((instrument_id, client_id_u32));
1047            } else {
1048                tracing::warn!(
1049                    "Cannot cancel order {}: client_order_id not found in cache",
1050                    client_order_id
1051                );
1052            }
1053        }
1054
1055        self.spawn_task("cancel_all_orders", async move {
1056            let wallet_guard = wallet.read().await;
1057            let wallet_ref = wallet_guard
1058                .as_ref()
1059                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1060
1061            let grpc_guard = grpc_client.read().await;
1062            let submitter = OrderSubmitter::new(
1063                (*grpc_guard).clone(),
1064                http_client.clone(),
1065                wallet_address,
1066                subaccount_number,
1067                chain_id,
1068                authenticator_ids,
1069            );
1070
1071            // Cancel orders using batch method (executes sequentially to avoid nonce conflicts)
1072            match submitter
1073                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1074                .await
1075            {
1076                Ok(_) => {
1077                    tracing::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1078                }
1079                Err(e) => {
1080                    tracing::error!("Batch cancel failed: {:?}", e);
1081                }
1082            }
1083
1084            Ok(())
1085        });
1086
1087        Ok(())
1088    }
1089
1090    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1091        if cmd.cancels.is_empty() {
1092            return Ok(());
1093        }
1094
1095        if !self.is_connected() {
1096            anyhow::bail!("Cannot cancel orders: not connected");
1097        }
1098
1099        // Convert ClientOrderIds to u32 before async block
1100        let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1101        for cancel in &cmd.cancels {
1102            let client_id_str = cancel.client_order_id.as_str();
1103            let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1104                Some(id) => id,
1105                None => {
1106                    tracing::warn!(
1107                        "No u32 mapping found for client_order_id={}, skipping cancel",
1108                        client_id_str
1109                    );
1110                    continue;
1111                }
1112            };
1113            orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1114        }
1115
1116        if orders_to_cancel.is_empty() {
1117            tracing::warn!("No valid orders to cancel in batch");
1118            return Ok(());
1119        }
1120
1121        let grpc_client = self.grpc_client.clone();
1122        let wallet = self.wallet.clone();
1123        let http_client = self.http_client.clone();
1124        let wallet_address = self.wallet_address.clone();
1125        let subaccount_number = self.subaccount_number;
1126        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1127        let chain_id = self.get_chain_id();
1128        let authenticator_ids = self.config.authenticator_ids.clone();
1129
1130        tracing::info!(
1131            "Batch cancelling {} orders: {:?}",
1132            orders_to_cancel.len(),
1133            orders_to_cancel
1134        );
1135
1136        self.spawn_task("batch_cancel_orders", async move {
1137            let wallet_guard = wallet.read().await;
1138            let wallet_ref = wallet_guard
1139                .as_ref()
1140                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1141
1142            let grpc_guard = grpc_client.read().await;
1143            let submitter = OrderSubmitter::new(
1144                (*grpc_guard).clone(),
1145                http_client.clone(),
1146                wallet_address,
1147                subaccount_number,
1148                chain_id,
1149                authenticator_ids,
1150            );
1151
1152            match submitter
1153                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1154                .await
1155            {
1156                Ok(()) => {
1157                    tracing::info!(
1158                        "Successfully batch cancelled {} orders",
1159                        orders_to_cancel.len()
1160                    );
1161                }
1162                Err(e) => {
1163                    tracing::error!("Batch cancel failed: {:?}", e);
1164                }
1165            }
1166
1167            Ok(())
1168        });
1169
1170        Ok(())
1171    }
1172
1173    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1174        Ok(())
1175    }
1176
1177    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1178        Ok(())
1179    }
1180
1181    async fn connect(&mut self) -> anyhow::Result<()> {
1182        if self.connected {
1183            tracing::warn!("dYdX execution client already connected");
1184            return Ok(());
1185        }
1186
1187        tracing::info!("Connecting to dYdX");
1188
1189        // Load instruments BEFORE WebSocket connection
1190        // Per Python implementation: "instruments are used in the first account channel message"
1191        tracing::debug!("Loading instruments from HTTP API");
1192        self.http_client.fetch_and_cache_instruments().await?;
1193        tracing::info!(
1194            "Loaded {} instruments from HTTP",
1195            self.http_client.instruments_cache.len()
1196        );
1197
1198        // Populate execution client's instrument lookup maps
1199        self.cache_instruments_from_http();
1200
1201        // Initialize wallet from config if mnemonic is provided
1202        if let Some(mnemonic) = &self.config.mnemonic {
1203            let wallet = Wallet::from_mnemonic(mnemonic)?;
1204            *self.wallet.write().await = Some(wallet);
1205            tracing::debug!("Wallet initialized");
1206        }
1207
1208        // Connect WebSocket
1209        self.ws_client.connect().await?;
1210        tracing::debug!("WebSocket connected");
1211
1212        // Subscribe to block height updates
1213        self.ws_client.subscribe_block_height().await?;
1214        tracing::debug!("Subscribed to block height updates");
1215
1216        // Subscribe to markets for instrument data
1217        self.ws_client.subscribe_markets().await?;
1218        tracing::debug!("Subscribed to markets");
1219
1220        // Subscribe to subaccount updates if authenticated
1221        if self.config.mnemonic.is_some() {
1222            self.ws_client
1223                .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1224                .await?;
1225            tracing::debug!(
1226                "Subscribed to subaccount updates: {}/{}",
1227                self.wallet_address,
1228                self.subaccount_number
1229            );
1230
1231            // Spawn WebSocket message processing task following standard adapter pattern
1232            // Per docs/developer_guide/adapters.md: Parse -> Dispatch -> Engine handles events
1233            if let Some(mut rx) = self.ws_client.take_receiver() {
1234                // Clone data needed for account state parsing in spawned task
1235                let account_id = self.core.account_id;
1236                let instruments = self.instruments.clone();
1237                let oracle_prices = self.oracle_prices.clone();
1238                let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1239                let block_height = self.block_height.clone();
1240
1241                let handle = get_runtime().spawn(async move {
1242                    while let Some(msg) = rx.recv().await {
1243                        match msg {
1244                            NautilusWsMessage::Order(report) => {
1245                                tracing::debug!("Received order update: {:?}", report.order_status);
1246                                dispatch_execution_report(ExecutionReport::Order(report));
1247                            }
1248                            NautilusWsMessage::Fill(report) => {
1249                                tracing::debug!("Received fill update");
1250                                dispatch_execution_report(ExecutionReport::Fill(report));
1251                            }
1252                            NautilusWsMessage::Position(report) => {
1253                                tracing::debug!("Received position update");
1254                                // Dispatch position status reports via execution event system
1255                                let sender = get_exec_event_sender();
1256                                let exec_report =
1257                                    NautilusExecutionReport::Position(Box::new(*report));
1258                                if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1259                                    tracing::warn!("Failed to send position status report: {e}");
1260                                }
1261                            }
1262                            NautilusWsMessage::AccountState(state) => {
1263                                tracing::debug!("Received account state update");
1264                                dispatch_account_state(*state);
1265                            }
1266                            NautilusWsMessage::SubaccountSubscribed(msg) => {
1267                                tracing::debug!(
1268                                    "Parsing subaccount subscription with full context"
1269                                );
1270
1271                                // Build instruments map for parsing (clone to avoid lifetime issues)
1272                                let inst_map: std::collections::HashMap<_, _> = instruments
1273                                    .iter()
1274                                    .map(|entry| (*entry.key(), entry.value().clone()))
1275                                    .collect();
1276
1277                                // Build oracle prices map (copy Decimals)
1278                                let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1279                                    .iter()
1280                                    .map(|entry| (*entry.key(), *entry.value()))
1281                                    .collect();
1282
1283                                let ts_init =
1284                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1285                                let ts_event = ts_init;
1286
1287                                match crate::http::parse::parse_account_state(
1288                                    &msg.contents.subaccount,
1289                                    account_id,
1290                                    &inst_map,
1291                                    &oracle_map,
1292                                    ts_event,
1293                                    ts_init,
1294                                ) {
1295                                    Ok(account_state) => {
1296                                        tracing::info!(
1297                                            "Parsed account state: {} balance(s), {} margin(s)",
1298                                            account_state.balances.len(),
1299                                            account_state.margins.len()
1300                                        );
1301                                        dispatch_account_state(account_state);
1302                                    }
1303                                    Err(e) => {
1304                                        tracing::error!("Failed to parse account state: {e}");
1305                                    }
1306                                }
1307
1308                                // Parse positions from the subscription
1309                                if let Some(ref positions) =
1310                                    msg.contents.subaccount.open_perpetual_positions
1311                                {
1312                                    tracing::debug!(
1313                                        "Parsing {} position(s) from subscription",
1314                                        positions.len()
1315                                    );
1316
1317                                    for (market, ws_position) in positions {
1318                                        match crate::websocket::parse::parse_ws_position_report(
1319                                            ws_position,
1320                                            &instruments,
1321                                            account_id,
1322                                            ts_init,
1323                                        ) {
1324                                            Ok(report) => {
1325                                                tracing::debug!(
1326                                                    "Parsed position report: {} {} {} {}",
1327                                                    report.instrument_id,
1328                                                    report.position_side,
1329                                                    report.quantity,
1330                                                    market
1331                                                );
1332                                                let sender = get_exec_event_sender();
1333                                                let exec_report = NautilusExecutionReport::Position(
1334                                                    Box::new(report),
1335                                                );
1336                                                if let Err(e) =
1337                                                    sender.send(ExecutionEvent::Report(exec_report))
1338                                                {
1339                                                    tracing::warn!(
1340                                                        "Failed to send position status report: {e}"
1341                                                    );
1342                                                }
1343                                            }
1344                                            Err(e) => {
1345                                                tracing::error!(
1346                                                    "Failed to parse WebSocket position for {}: {e}",
1347                                                    market
1348                                                );
1349                                            }
1350                                        }
1351                                    }
1352                                }
1353                            }
1354                            NautilusWsMessage::SubaccountsChannelData(data) => {
1355                                tracing::debug!("Processing subaccounts channel data");
1356                                let ts_init =
1357                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1358
1359                                // Process orders
1360                                if let Some(ref orders) = data.contents.orders {
1361                                    for ws_order in orders {
1362                                        match crate::websocket::parse::parse_ws_order_report(
1363                                            ws_order,
1364                                            &clob_pair_id_to_instrument,
1365                                            &instruments,
1366                                            account_id,
1367                                            ts_init,
1368                                        ) {
1369                                            Ok(report) => {
1370                                                tracing::debug!(
1371                                                    "Parsed order report: {} {} {} @ {}",
1372                                                    report.instrument_id,
1373                                                    report.order_side,
1374                                                    report.order_status,
1375                                                    report.quantity
1376                                                );
1377                                                let sender = get_exec_event_sender();
1378                                                let exec_report =
1379                                                    NautilusExecutionReport::OrderStatus(Box::new(
1380                                                        report,
1381                                                    ));
1382                                                if let Err(e) =
1383                                                    sender.send(ExecutionEvent::Report(exec_report))
1384                                                {
1385                                                    tracing::warn!(
1386                                                        "Failed to send order status report: {e}"
1387                                                    );
1388                                                }
1389                                            }
1390                                            Err(e) => {
1391                                                tracing::error!(
1392                                                    "Failed to parse WebSocket order: {e}"
1393                                                );
1394                                            }
1395                                        }
1396                                    }
1397                                }
1398
1399                                // Process fills
1400                                if let Some(ref fills) = data.contents.fills {
1401                                    for ws_fill in fills {
1402                                        match crate::websocket::parse::parse_ws_fill_report(
1403                                            ws_fill,
1404                                            &instruments,
1405                                            account_id,
1406                                            ts_init,
1407                                        ) {
1408                                            Ok(report) => {
1409                                                tracing::debug!(
1410                                                    "Parsed fill report: {} {} {} @ {}",
1411                                                    report.instrument_id,
1412                                                    report.venue_order_id,
1413                                                    report.last_qty,
1414                                                    report.last_px
1415                                                );
1416                                                let sender = get_exec_event_sender();
1417                                                let exec_report =
1418                                                    NautilusExecutionReport::Fill(Box::new(report));
1419                                                if let Err(e) =
1420                                                    sender.send(ExecutionEvent::Report(exec_report))
1421                                                {
1422                                                    tracing::warn!(
1423                                                        "Failed to send fill report: {e}"
1424                                                    );
1425                                                }
1426                                            }
1427                                            Err(e) => {
1428                                                tracing::error!(
1429                                                    "Failed to parse WebSocket fill: {e}"
1430                                                );
1431                                            }
1432                                        }
1433                                    }
1434                                }
1435                            }
1436                            NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1437                                tracing::debug!(
1438                                    "Processing oracle price updates for {} markets",
1439                                    oracle_prices_map.len()
1440                                );
1441
1442                                // Update oracle_prices map with new prices
1443                                for (market_symbol, oracle_data) in &oracle_prices_map {
1444                                    // Parse oracle price
1445                                    match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1446                                    {
1447                                        Ok(price) => {
1448                                            // Find instrument by symbol (oracle uses raw symbol like "BTC-USD")
1449                                            // Append "-PERP" to match instrument IDs
1450                                            let symbol_with_perp = format!("{market_symbol}-PERP");
1451
1452                                            // Find matching instrument
1453                                            if let Some(entry) = instruments.iter().find(|entry| {
1454                                                entry.value().id().symbol.as_str()
1455                                                    == symbol_with_perp
1456                                            }) {
1457                                                let instrument_id = *entry.key();
1458                                                oracle_prices.insert(instrument_id, price);
1459                                                tracing::trace!(
1460                                                    "Updated oracle price for {}: {}",
1461                                                    instrument_id,
1462                                                    price
1463                                                );
1464                                            } else {
1465                                                tracing::debug!(
1466                                                    "No instrument found for market symbol '{}' (tried '{}')",
1467                                                    market_symbol,
1468                                                    symbol_with_perp
1469                                                );
1470                                            }
1471                                        }
1472                                        Err(e) => {
1473                                            tracing::warn!(
1474                                                "Failed to parse oracle price for {}: {}",
1475                                                market_symbol,
1476                                                e
1477                                            );
1478                                        }
1479                                    }
1480                                }
1481                            }
1482                            NautilusWsMessage::BlockHeight(height) => {
1483                                tracing::debug!("Block height update: {}", height);
1484                                block_height.store(height, std::sync::atomic::Ordering::Relaxed);
1485                            }
1486                            NautilusWsMessage::Error(err) => {
1487                                tracing::error!("WebSocket error: {:?}", err);
1488                            }
1489                            NautilusWsMessage::Reconnected => {
1490                                tracing::info!("WebSocket reconnected");
1491                            }
1492                            _ => {
1493                                // Data, Deltas are for market data, not execution
1494                            }
1495                        }
1496                    }
1497                    tracing::info!("WebSocket message processing task ended");
1498                });
1499
1500                self.ws_stream_handle = Some(handle);
1501                tracing::debug!("Spawned WebSocket message processing task");
1502            }
1503        }
1504
1505        self.connected = true;
1506        tracing::info!(client_id = %self.core.client_id, "Connected");
1507        Ok(())
1508    }
1509
1510    async fn disconnect(&mut self) -> anyhow::Result<()> {
1511        if !self.connected {
1512            tracing::warn!("dYdX execution client not connected");
1513            return Ok(());
1514        }
1515
1516        tracing::info!("Disconnecting from dYdX");
1517
1518        // Unsubscribe from subaccount updates if authenticated
1519        if self.config.mnemonic.is_some() {
1520            let _ = self
1521                .ws_client
1522                .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1523                .await
1524                .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1525        }
1526
1527        // Unsubscribe from markets
1528        let _ = self
1529            .ws_client
1530            .unsubscribe_markets()
1531            .await
1532            .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1533
1534        // Unsubscribe from block height
1535        let _ = self
1536            .ws_client
1537            .unsubscribe_block_height()
1538            .await
1539            .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1540
1541        // Disconnect WebSocket
1542        self.ws_client.disconnect().await?;
1543
1544        // Abort WebSocket message processing task
1545        if let Some(handle) = self.ws_stream_handle.take() {
1546            handle.abort();
1547            tracing::debug!("Aborted WebSocket message processing task");
1548        }
1549
1550        // Abort any pending tasks
1551        self.abort_pending_tasks();
1552
1553        self.connected = false;
1554        tracing::info!(client_id = %self.core.client_id, "Disconnected");
1555        Ok(())
1556    }
1557
1558    async fn generate_order_status_report(
1559        &self,
1560        cmd: &GenerateOrderStatusReport,
1561    ) -> anyhow::Result<Option<OrderStatusReport>> {
1562        // Query single order from dYdX API
1563        let response = self
1564            .http_client
1565            .inner
1566            .get_orders(
1567                &self.wallet_address,
1568                self.subaccount_number,
1569                None,    // market filter
1570                Some(1), // limit to 1 result
1571            )
1572            .await
1573            .context("failed to fetch order from dYdX API")?;
1574
1575        if response.is_empty() {
1576            return Ok(None);
1577        }
1578
1579        let order = &response[0];
1580        let ts_init = UnixNanos::default();
1581
1582        let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1583            Some(inst) => inst,
1584            None => return Ok(None),
1585        };
1586
1587        let report = crate::http::parse::parse_order_status_report(
1588            order,
1589            &instrument,
1590            self.core.account_id,
1591            ts_init,
1592        )
1593        .context("failed to parse order status report")?;
1594
1595        if let Some(client_order_id) = cmd.client_order_id
1596            && report.client_order_id != Some(client_order_id)
1597        {
1598            return Ok(None);
1599        }
1600
1601        if let Some(venue_order_id) = cmd.venue_order_id
1602            && report.venue_order_id.as_str() != venue_order_id.as_str()
1603        {
1604            return Ok(None);
1605        }
1606
1607        if let Some(instrument_id) = cmd.instrument_id
1608            && report.instrument_id != instrument_id
1609        {
1610            return Ok(None);
1611        }
1612
1613        Ok(Some(report))
1614    }
1615
1616    async fn generate_order_status_reports(
1617        &self,
1618        cmd: &GenerateOrderStatusReports,
1619    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1620        // Query orders from dYdX API
1621        let response = self
1622            .http_client
1623            .inner
1624            .get_orders(
1625                &self.wallet_address,
1626                self.subaccount_number,
1627                None, // market filter
1628                None, // limit
1629            )
1630            .await
1631            .context("failed to fetch orders from dYdX API")?;
1632
1633        let mut reports = Vec::new();
1634        let ts_init = UnixNanos::default();
1635
1636        for order in response {
1637            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1638                Some(inst) => inst,
1639                None => continue,
1640            };
1641
1642            if let Some(filter_id) = cmd.instrument_id
1643                && instrument.id() != filter_id
1644            {
1645                continue;
1646            }
1647
1648            let report = match crate::http::parse::parse_order_status_report(
1649                &order,
1650                &instrument,
1651                self.core.account_id,
1652                ts_init,
1653            ) {
1654                Ok(r) => r,
1655                Err(e) => {
1656                    tracing::warn!("Failed to parse order status report: {e}");
1657                    continue;
1658                }
1659            };
1660
1661            reports.push(report);
1662        }
1663
1664        // Filter by open_only if specified
1665        if cmd.open_only {
1666            reports.retain(|r| r.order_status.is_open());
1667        }
1668
1669        // Filter by time range if specified
1670        if let Some(start) = cmd.start {
1671            reports.retain(|r| r.ts_last >= start);
1672        }
1673        if let Some(end) = cmd.end {
1674            reports.retain(|r| r.ts_last <= end);
1675        }
1676
1677        Ok(reports)
1678    }
1679
1680    async fn generate_fill_reports(
1681        &self,
1682        cmd: GenerateFillReports,
1683    ) -> anyhow::Result<Vec<FillReport>> {
1684        let response = self
1685            .http_client
1686            .inner
1687            .get_fills(
1688                &self.wallet_address,
1689                self.subaccount_number,
1690                None, // market filter
1691                None, // limit
1692            )
1693            .await
1694            .context("failed to fetch fills from dYdX API")?;
1695
1696        let mut reports = Vec::new();
1697        let ts_init = UnixNanos::default();
1698
1699        for fill in response.fills {
1700            let instrument = match self.get_instrument_by_market(&fill.market) {
1701                Some(inst) => inst,
1702                None => {
1703                    tracing::warn!("Unknown market in fill: {}", fill.market);
1704                    continue;
1705                }
1706            };
1707
1708            if let Some(filter_id) = cmd.instrument_id
1709                && instrument.id() != filter_id
1710            {
1711                continue;
1712            }
1713
1714            let report = match crate::http::parse::parse_fill_report(
1715                &fill,
1716                &instrument,
1717                self.core.account_id,
1718                ts_init,
1719            ) {
1720                Ok(r) => r,
1721                Err(e) => {
1722                    tracing::warn!("Failed to parse fill report: {e}");
1723                    continue;
1724                }
1725            };
1726
1727            reports.push(report);
1728        }
1729
1730        if let Some(venue_order_id) = cmd.venue_order_id {
1731            reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
1732        }
1733
1734        Ok(reports)
1735    }
1736
1737    async fn generate_position_status_reports(
1738        &self,
1739        cmd: &GeneratePositionStatusReports,
1740    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1741        // Query subaccount positions from dYdX API
1742        let response = self
1743            .http_client
1744            .inner
1745            .get_subaccount(&self.wallet_address, self.subaccount_number)
1746            .await
1747            .context("failed to fetch subaccount from dYdX API")?;
1748
1749        let mut reports = Vec::new();
1750        let ts_init = UnixNanos::default();
1751
1752        for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
1753            let instrument = match self.get_instrument_by_market(market_ticker) {
1754                Some(inst) => inst,
1755                None => {
1756                    tracing::warn!("Unknown market in position: {}", market_ticker);
1757                    continue;
1758                }
1759            };
1760
1761            if let Some(filter_id) = cmd.instrument_id
1762                && instrument.id() != filter_id
1763            {
1764                continue;
1765            }
1766
1767            let report = match crate::http::parse::parse_position_status_report(
1768                perp_position,
1769                &instrument,
1770                self.core.account_id,
1771                ts_init,
1772            ) {
1773                Ok(r) => r,
1774                Err(e) => {
1775                    tracing::warn!("Failed to parse position status report: {e}");
1776                    continue;
1777                }
1778            };
1779
1780            reports.push(report);
1781        }
1782
1783        Ok(reports)
1784    }
1785
1786    async fn generate_mass_status(
1787        &self,
1788        lookback_mins: Option<u64>,
1789    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1790        let ts_init = UnixNanos::default();
1791
1792        // Query orders
1793        let orders_response = self
1794            .http_client
1795            .inner
1796            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1797            .await
1798            .context("failed to fetch orders for mass status")?;
1799
1800        // Query subaccount for positions
1801        let subaccount_response = self
1802            .http_client
1803            .inner
1804            .get_subaccount(&self.wallet_address, self.subaccount_number)
1805            .await
1806            .context("failed to fetch subaccount for mass status")?;
1807
1808        // Query fills
1809        let fills_response = self
1810            .http_client
1811            .inner
1812            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1813            .await
1814            .context("failed to fetch fills for mass status")?;
1815
1816        // Parse order reports
1817        let mut order_reports = Vec::new();
1818        let mut orders_filtered = 0usize;
1819        for order in orders_response {
1820            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1821                Some(inst) => inst,
1822                None => {
1823                    orders_filtered += 1;
1824                    continue;
1825                }
1826            };
1827
1828            match crate::http::parse::parse_order_status_report(
1829                &order,
1830                &instrument,
1831                self.core.account_id,
1832                ts_init,
1833            ) {
1834                Ok(r) => order_reports.push(r),
1835                Err(e) => {
1836                    tracing::warn!("Failed to parse order status report: {e}");
1837                    orders_filtered += 1;
1838                }
1839            }
1840        }
1841
1842        // Parse position reports
1843        let mut position_reports = Vec::new();
1844        for (market_ticker, perp_position) in
1845            &subaccount_response.subaccount.open_perpetual_positions
1846        {
1847            let instrument = match self.get_instrument_by_market(market_ticker) {
1848                Some(inst) => inst,
1849                None => continue,
1850            };
1851
1852            match crate::http::parse::parse_position_status_report(
1853                perp_position,
1854                &instrument,
1855                self.core.account_id,
1856                ts_init,
1857            ) {
1858                Ok(r) => position_reports.push(r),
1859                Err(e) => {
1860                    tracing::warn!("Failed to parse position status report: {e}");
1861                }
1862            }
1863        }
1864
1865        // Parse fill reports
1866        let mut fill_reports = Vec::new();
1867        let mut fills_filtered = 0usize;
1868        for fill in fills_response.fills {
1869            let instrument = match self.get_instrument_by_market(&fill.market) {
1870                Some(inst) => inst,
1871                None => {
1872                    fills_filtered += 1;
1873                    continue;
1874                }
1875            };
1876
1877            match crate::http::parse::parse_fill_report(
1878                &fill,
1879                &instrument,
1880                self.core.account_id,
1881                ts_init,
1882            ) {
1883                Ok(r) => fill_reports.push(r),
1884                Err(e) => {
1885                    tracing::warn!("Failed to parse fill report: {e}");
1886                    fills_filtered += 1;
1887                }
1888            }
1889        }
1890
1891        if lookback_mins.is_some() {
1892            tracing::debug!(
1893                "lookback_mins={:?} filtering not yet implemented. Returning all: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1894                lookback_mins,
1895                order_reports.len(),
1896                orders_filtered,
1897                position_reports.len(),
1898                fill_reports.len(),
1899                fills_filtered
1900            );
1901        } else {
1902            tracing::info!(
1903                "Generated mass status: {} orders, {} positions, {} fills",
1904                order_reports.len(),
1905                position_reports.len(),
1906                fill_reports.len()
1907            );
1908        }
1909
1910        // Create mass status and add reports
1911        let mut mass_status = ExecutionMassStatus::new(
1912            self.core.client_id,
1913            self.core.account_id,
1914            self.core.venue,
1915            ts_init,
1916            None, // report_id will be auto-generated
1917        );
1918
1919        mass_status.add_order_reports(order_reports);
1920        mass_status.add_position_reports(position_reports);
1921        mass_status.add_fill_reports(fill_reports);
1922
1923        Ok(Some(mass_status))
1924    }
1925}
1926
1927/// Dispatches account state events to the portfolio.
1928///
1929/// AccountState events are routed to the Portfolio (not ExecEngine) via msgbus.
1930/// This follows the pattern used by BitMEX, OKX, and other reference adapters.
1931fn dispatch_account_state(state: AccountState) {
1932    use std::any::Any;
1933    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1934}
1935
1936/// Dispatches execution reports to the execution engine.
1937///
1938/// This follows the standard adapter pattern where WebSocket handlers parse messages
1939/// into reports, and a dispatch function sends them via the execution event system.
1940/// The execution engine then handles cache lookups and event generation.
1941///
1942/// # Architecture
1943///
1944/// Per `docs/developer_guide/adapters.md`, adapters should:
1945/// 1. Parse WebSocket messages into ExecutionReports in the handler
1946/// 2. Dispatch reports via `get_exec_event_sender()`
1947/// 3. Let the execution engine handle event generation (has cache access)
1948///
1949/// This pattern is used by Hyperliquid, OKX, BitMEX, and other reference adapters.
1950fn dispatch_execution_report(report: ExecutionReport) {
1951    let sender = get_exec_event_sender();
1952    match report {
1953        ExecutionReport::Order(order_report) => {
1954            tracing::debug!(
1955                "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1956                order_report.order_status,
1957                order_report.venue_order_id,
1958                order_report.client_order_id
1959            );
1960            let exec_report = NautilusExecutionReport::OrderStatus(order_report);
1961            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1962                tracing::warn!("Failed to send order status report: {e}");
1963            }
1964        }
1965        ExecutionReport::Fill(fill_report) => {
1966            tracing::debug!(
1967                "Dispatching fill report: venue_order_id={}, trade_id={}",
1968                fill_report.venue_order_id,
1969                fill_report.trade_id
1970            );
1971            let exec_report = NautilusExecutionReport::Fill(fill_report);
1972            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1973                tracing::warn!("Failed to send fill report: {e}");
1974            }
1975        }
1976    }
1977}