nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17
18use std::sync::{Arc, Mutex, atomic::AtomicU64};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use dashmap::DashMap;
23use nautilus_common::{
24    messages::{
25        ExecutionEvent,
26        execution::{
27            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
29            QueryOrder, SubmitOrder, SubmitOrderList,
30        },
31    },
32    msgbus,
33    runner::get_exec_event_sender,
34    runtime::get_runtime,
35};
36use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
37use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
38use nautilus_live::execution::client::LiveExecutionClient;
39use nautilus_model::{
40    accounts::AccountAny,
41    enums::{OmsType, OrderSide, OrderType, TimeInForce},
42    events::{OrderCancelRejected, OrderEventAny, OrderRejected},
43    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
44    instruments::{Instrument, InstrumentAny},
45    orders::Order,
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use rust_decimal::Decimal;
50use tokio::task::JoinHandle;
51
52use crate::{
53    common::{consts::DYDX_VENUE, credential::DydxCredential},
54    config::DydxAdapterConfig,
55    execution::submitter::OrderSubmitter,
56    grpc::{DydxGrpcClient, OrderBuilder, Wallet},
57    http::client::DydxHttpClient,
58    websocket::client::DydxWebSocketClient,
59};
60
61pub mod submitter;
62
63/// Maximum client order ID value for dYdX.
64pub const MAX_CLIENT_ID: u32 = u32::MAX;
65
66/// Execution report types dispatched from WebSocket message handler.
67///
68/// This enum groups order and fill reports for unified dispatch handling,
69/// following the pattern used by reference adapters (Hyperliquid, OKX).
70enum ExecutionReport {
71    Order(Box<OrderStatusReport>),
72    Fill(Box<FillReport>),
73}
74
75/// Live execution client for the dYdX v4 exchange adapter.
76///
77/// Supports Market and Limit orders via gRPC. Conditional orders (Stop, Take Profit,
78/// Trailing Stop) planned for future releases. dYdX requires u32 client IDs - strings
79/// are hashed to fit this constraint.
80///
81/// # Architecture
82///
83/// The client follows a two-layer execution model:
84/// 1. **Synchronous validation** - Immediate checks and event generation
85/// 2. **Async submission** - Non-blocking gRPC calls via `OrderSubmitter`
86///
87/// This matches the pattern used in OKX and other exchange adapters, ensuring
88/// consistent behavior across the Nautilus ecosystem.
89#[derive(Debug)]
90#[allow(dead_code)] // TODO: Remove once implementation is complete
91pub struct DydxExecutionClient {
92    core: ExecutionClientCore,
93    config: DydxAdapterConfig,
94    http_client: DydxHttpClient,
95    ws_client: DydxWebSocketClient,
96    grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
97    wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
98    order_builders: DashMap<InstrumentId, OrderBuilder>,
99    // NOTE: Currently unpopulated - instrument loading not yet implemented
100    instruments: DashMap<InstrumentId, InstrumentAny>,
101    market_to_instrument: DashMap<String, InstrumentId>,
102    clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
103    block_height: AtomicU64,
104    oracle_prices: DashMap<InstrumentId, Decimal>,
105    client_id_to_int: DashMap<String, u32>,
106    int_to_client_id: DashMap<u32, String>,
107    wallet_address: String,
108    subaccount_number: u32,
109    started: bool,
110    connected: bool,
111    instruments_initialized: bool,
112    ws_stream_handle: Option<JoinHandle<()>>,
113    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
114}
115
116impl DydxExecutionClient {
117    /// Creates a new [`DydxExecutionClient`].
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the WebSocket client fails to construct.
122    pub fn new(
123        core: ExecutionClientCore,
124        config: DydxAdapterConfig,
125        wallet_address: String,
126        subaccount_number: u32,
127    ) -> anyhow::Result<Self> {
128        let http_client = DydxHttpClient::default();
129
130        // Use private WebSocket client for authenticated subaccount subscriptions
131        let ws_client = if let Some(ref mnemonic) = config.mnemonic {
132            let credential = DydxCredential::from_mnemonic(mnemonic, subaccount_number, vec![])?;
133            DydxWebSocketClient::new_private(
134                config.ws_url.clone(),
135                credential,
136                core.account_id,
137                Some(20),
138            )
139        } else {
140            DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
141        };
142
143        let grpc_urls = config.get_grpc_urls();
144        let grpc_client = Arc::new(tokio::sync::RwLock::new(
145            get_runtime()
146                .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
147                .context("failed to construct dYdX gRPC client")?,
148        ));
149
150        Ok(Self {
151            core,
152            config,
153            http_client,
154            ws_client,
155            grpc_client,
156            wallet: Arc::new(tokio::sync::RwLock::new(None)),
157            order_builders: DashMap::new(),
158            instruments: DashMap::new(),
159            market_to_instrument: DashMap::new(),
160            clob_pair_id_to_instrument: DashMap::new(),
161            block_height: AtomicU64::new(0),
162            oracle_prices: DashMap::new(),
163            client_id_to_int: DashMap::new(),
164            int_to_client_id: DashMap::new(),
165            wallet_address,
166            subaccount_number,
167            started: false,
168            connected: false,
169            instruments_initialized: false,
170            ws_stream_handle: None,
171            pending_tasks: Mutex::new(Vec::new()),
172        })
173    }
174
175    /// Generate a unique client order ID integer and store the mapping.
176    ///
177    /// Attempts to parse the client_order_id as an integer first. If that fails,
178    /// generates a random value within the valid range [0, MAX_CLIENT_ID).
179    #[allow(dead_code)] // TODO: Remove once used in submit_order
180    fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
181        if let Ok(id) = client_order_id.parse::<u32>() {
182            self.client_id_to_int
183                .insert(client_order_id.to_string(), id);
184            self.int_to_client_id
185                .insert(id, client_order_id.to_string());
186            return id;
187        }
188
189        // Generate random value if parsing fails
190        let id = rand::random::<u32>();
191        self.client_id_to_int
192            .insert(client_order_id.to_string(), id);
193        self.int_to_client_id
194            .insert(id, client_order_id.to_string());
195        id
196    }
197
198    /// Retrieve the client order ID integer from the cache.
199    ///
200    /// Returns `None` if the mapping doesn't exist.
201    #[allow(dead_code)] // TODO: Remove once used in cancel_order
202    fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
203        // Try parsing first
204        if let Ok(id) = client_order_id.parse::<u32>() {
205            return Some(id);
206        }
207
208        // Look up in cache
209        self.client_id_to_int
210            .get(client_order_id)
211            .map(|entry| *entry.value())
212    }
213
214    /// Retrieve the client order ID string from the integer value.
215    ///
216    /// Returns the integer as a string if no mapping exists.
217    #[allow(dead_code)] // TODO: Remove once used in handle_order_message
218    fn get_client_order_id(&self, client_order_id_int: u32) -> String {
219        self.int_to_client_id.get(&client_order_id_int).map_or_else(
220            || client_order_id_int.to_string(),
221            |entry| entry.value().clone(),
222        )
223    }
224
225    /// Cache instruments from HTTP client into execution client's lookup maps.
226    ///
227    /// This populates three data structures for efficient lookups:
228    /// - `instruments`: InstrumentId → InstrumentAny
229    /// - `market_to_instrument`: Market ticker (e.g., "BTC-USD") → InstrumentId
230    /// - `clob_pair_id_to_instrument`: CLOB pair ID → InstrumentId
231    fn cache_instruments_from_http(&mut self) {
232        use nautilus_model::instruments::InstrumentAny;
233
234        // Get all instruments from HTTP client cache
235        let instruments: Vec<InstrumentAny> = self
236            .http_client
237            .instruments_cache
238            .iter()
239            .map(|entry| entry.value().clone())
240            .collect();
241
242        tracing::debug!(
243            "Caching {} instruments in execution client",
244            instruments.len()
245        );
246
247        for instrument in instruments {
248            let instrument_id = instrument.id();
249            let symbol = instrument_id.symbol.as_str();
250
251            // Cache instrument by InstrumentId
252            self.instruments.insert(instrument_id, instrument.clone());
253
254            // Cache market ticker → InstrumentId mapping
255            self.market_to_instrument
256                .insert(symbol.to_string(), instrument_id);
257        }
258
259        // Copy clob_pair_id → InstrumentId mapping from HTTP client
260        // The HTTP client populates this from PerpetualMarket.clob_pair_id (authoritative source)
261        let http_mapping = self.http_client.clob_pair_id_mapping();
262        for entry in http_mapping.iter() {
263            self.clob_pair_id_to_instrument
264                .insert(*entry.key(), *entry.value());
265        }
266
267        self.instruments_initialized = true;
268        tracing::info!(
269            "Cached {} instruments ({} CLOB pair IDs) with market mappings",
270            self.instruments.len(),
271            self.clob_pair_id_to_instrument.len()
272        );
273    }
274
275    /// Get an instrument by market ticker (e.g., "BTC-USD").
276    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
277        self.market_to_instrument
278            .get(market)
279            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
280    }
281
282    /// Get an instrument by clob_pair_id.
283    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
284        let instrument = self
285            .clob_pair_id_to_instrument
286            .get(&clob_pair_id)
287            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
288
289        if instrument.is_none() {
290            self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
291        }
292
293        instrument
294    }
295
296    fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
297        let known: Vec<(u32, String)> = self
298            .clob_pair_id_to_instrument
299            .iter()
300            .filter_map(|entry| {
301                let instrument_id = entry.value();
302                self.instruments.get(instrument_id).map(|inst_entry| {
303                    (
304                        *entry.key(),
305                        inst_entry.value().id().symbol.as_str().to_string(),
306                    )
307                })
308            })
309            .collect();
310
311        tracing::warn!(
312            "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
313            clob_pair_id,
314            known
315        );
316    }
317
318    fn spawn_task<F>(&self, label: &'static str, fut: F)
319    where
320        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
321    {
322        let handle = tokio::spawn(async move {
323            if let Err(e) = fut.await {
324                tracing::error!("{label}: {e:?}");
325            }
326        });
327
328        self.pending_tasks
329            .lock()
330            .expect(MUTEX_POISONED)
331            .push(handle);
332    }
333
334    /// Spawns an order submission task with error handling and rejection generation.
335    ///
336    /// If the submission fails, generates an `OrderRejected` event with the error details.
337    fn spawn_order_task<F>(
338        &self,
339        label: &'static str,
340        strategy_id: StrategyId,
341        instrument_id: InstrumentId,
342        client_order_id: ClientOrderId,
343        fut: F,
344    ) where
345        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
346    {
347        // Capture necessary data for rejection event
348        let trader_id = self.core.trader_id;
349        let account_id = self.core.account_id;
350        let sender = get_exec_event_sender();
351
352        let handle = tokio::spawn(async move {
353            if let Err(e) = fut.await {
354                let error_msg = format!("{label} failed: {e:?}");
355                tracing::error!("{}", error_msg);
356
357                // Generate OrderRejected event on submission failure
358                let ts_now = UnixNanos::default(); // Use current time
359                let event = OrderRejected::new(
360                    trader_id,
361                    strategy_id,
362                    instrument_id,
363                    client_order_id,
364                    account_id,
365                    error_msg.into(),
366                    UUID4::new(),
367                    ts_now,
368                    ts_now,
369                    false,
370                    false,
371                );
372
373                if let Err(send_err) =
374                    sender.send(nautilus_common::messages::ExecutionEvent::Order(
375                        OrderEventAny::Rejected(event),
376                    ))
377                {
378                    tracing::error!("Failed to send OrderRejected event: {send_err}");
379                }
380            }
381        });
382
383        self.pending_tasks
384            .lock()
385            .expect(MUTEX_POISONED)
386            .push(handle);
387    }
388
389    fn abort_pending_tasks(&self) {
390        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
391        for handle in guard.drain(..) {
392            handle.abort();
393        }
394    }
395}
396impl ExecutionClient for DydxExecutionClient {
397    fn is_connected(&self) -> bool {
398        self.connected
399    }
400
401    fn client_id(&self) -> ClientId {
402        self.core.client_id
403    }
404
405    fn account_id(&self) -> AccountId {
406        self.core.account_id
407    }
408
409    fn venue(&self) -> Venue {
410        *DYDX_VENUE
411    }
412
413    fn oms_type(&self) -> OmsType {
414        self.core.oms_type
415    }
416
417    fn get_account(&self) -> Option<AccountAny> {
418        self.core.get_account()
419    }
420
421    fn generate_account_state(
422        &self,
423        balances: Vec<AccountBalance>,
424        margins: Vec<MarginBalance>,
425        reported: bool,
426        ts_event: UnixNanos,
427    ) -> anyhow::Result<()> {
428        self.core
429            .generate_account_state(balances, margins, reported, ts_event)
430    }
431
432    fn start(&mut self) -> anyhow::Result<()> {
433        if self.started {
434            tracing::warn!("dYdX execution client already started");
435            return Ok(());
436        }
437
438        tracing::info!("Starting dYdX execution client");
439        self.started = true;
440        Ok(())
441    }
442
443    fn stop(&mut self) -> anyhow::Result<()> {
444        if !self.started {
445            tracing::warn!("dYdX execution client not started");
446            return Ok(());
447        }
448
449        tracing::info!("Stopping dYdX execution client");
450        self.abort_pending_tasks();
451        self.started = false;
452        self.connected = false;
453        Ok(())
454    }
455
456    /// Submits an order to dYdX via gRPC.
457    ///
458    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
459    /// Only Market and Limit orders supported currently. Conditional orders (Stop, Take Profit)
460    /// will be implemented in future releases.
461    ///
462    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
463    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
464    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
465        let order = cmd.order.clone();
466
467        // Check connection status
468        if !self.is_connected() {
469            let reason = "Cannot submit order: execution client not connected";
470            tracing::error!("{}", reason);
471            anyhow::bail!(reason);
472        }
473
474        // Check if order is already closed
475        if order.is_closed() {
476            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
477            return Ok(());
478        }
479
480        // Validate order type - only market and limit orders supported
481        match order.order_type() {
482            OrderType::Market | OrderType::Limit => {
483                // Supported order types
484                tracing::debug!(
485                    "Submitting {} order: {}",
486                    if matches!(order.order_type(), OrderType::Market) {
487                        "MARKET"
488                    } else {
489                        "LIMIT"
490                    },
491                    order.client_order_id()
492                );
493            }
494            // Conditional order stubs - accept but don't submit until proto implementation
495            OrderType::StopMarket
496            | OrderType::StopLimit
497            | OrderType::MarketIfTouched
498            | OrderType::LimitIfTouched
499            | OrderType::TrailingStopMarket
500            | OrderType::TrailingStopLimit => {
501                self.core.generate_order_submitted(
502                    order.strategy_id(),
503                    order.instrument_id(),
504                    order.client_order_id(),
505                    cmd.ts_init,
506                );
507                tracing::warn!(
508                    order_type = ?order.order_type(),
509                    client_order_id = %order.client_order_id(),
510                    "Conditional order stub: OrderSubmitted generated but not sent to exchange (proto implementation pending)"
511                );
512                return Ok(());
513            }
514            order_type => {
515                let reason = format!("Order type {:?} not supported by dYdX", order_type);
516                tracing::error!("{}", reason);
517                self.core.generate_order_rejected(
518                    order.strategy_id(),
519                    order.instrument_id(),
520                    order.client_order_id(),
521                    &reason,
522                    cmd.ts_init,
523                    false,
524                );
525                return Ok(());
526            }
527        }
528
529        // Generate OrderSubmitted event immediately
530        self.core.generate_order_submitted(
531            order.strategy_id(),
532            order.instrument_id(),
533            order.client_order_id(),
534            cmd.ts_init,
535        );
536
537        let grpc_client = self.grpc_client.clone();
538        let wallet = self.wallet.clone();
539        let wallet_address = self.wallet_address.clone();
540        let subaccount_number = self.subaccount_number;
541        let client_order_id = order.client_order_id();
542        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
543        #[allow(clippy::redundant_clone)]
544        let order_clone = order.clone();
545
546        self.spawn_order_task(
547            "submit_order",
548            order.strategy_id(),
549            order.instrument_id(),
550            order.client_order_id(),
551            async move {
552                let wallet_guard = wallet.read().await;
553                let wallet_ref = wallet_guard
554                    .as_ref()
555                    .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
556
557                let grpc_guard = grpc_client.read().await;
558                let submitter =
559                    OrderSubmitter::new((*grpc_guard).clone(), wallet_address, subaccount_number);
560
561                // Generate client_order_id as u32 (dYdX requires u32 client IDs)
562                // TODO: Implement proper client_order_id to u32 mapping
563                let client_id_u32 = client_order_id.as_str().parse::<u32>().unwrap_or_else(|_| {
564                    // Fallback: use hash of client_order_id
565                    use std::{
566                        collections::hash_map::DefaultHasher,
567                        hash::{Hash, Hasher},
568                    };
569                    let mut hasher = DefaultHasher::new();
570                    client_order_id.as_str().hash(&mut hasher);
571                    (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32
572                });
573
574                // Submit order based on type
575                match order_clone.order_type() {
576                    OrderType::Market => {
577                        submitter
578                            .submit_market_order(
579                                wallet_ref,
580                                client_id_u32,
581                                order_clone.order_side(),
582                                order_clone.quantity(),
583                                block_height,
584                            )
585                            .await?;
586                        tracing::info!("Successfully submitted market order: {}", client_order_id);
587                    }
588                    OrderType::Limit => {
589                        let expire_time = order_clone
590                            .expire_time()
591                            .map(|t| (t.as_u64() / 1_000_000_000) as i64);
592                        submitter
593                            .submit_limit_order(
594                                wallet_ref,
595                                client_id_u32,
596                                order_clone.order_side(),
597                                order_clone
598                                    .price()
599                                    .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
600                                order_clone.quantity(),
601                                order_clone.time_in_force(),
602                                order_clone.is_post_only(),
603                                order_clone.is_reduce_only(),
604                                block_height,
605                                expire_time,
606                            )
607                            .await?;
608                        tracing::info!("Successfully submitted limit order: {}", client_order_id);
609                    }
610                    _ => unreachable!("Order type already validated"),
611                }
612
613                Ok(())
614            },
615        );
616
617        Ok(())
618    }
619
620    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
621        anyhow::bail!("Order lists not supported by dYdX")
622    }
623
624    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
625        anyhow::bail!("Order modification not supported by dYdX")
626    }
627
628    /// Cancels an order on dYdX exchange.
629    ///
630    /// Validates the order state and retrieves instrument details before
631    /// spawning an async task to cancel via gRPC.
632    ///
633    /// # Validation
634    /// - Checks order exists in cache
635    /// - Validates order is not already closed
636    /// - Retrieves instrument from cache for order builder
637    ///
638    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
639    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
640    ///
641    /// # Events
642    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation
643    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation
644    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
645        if !self.is_connected() {
646            anyhow::bail!("Cannot cancel order: not connected");
647        }
648
649        let client_order_id = cmd.client_order_id;
650
651        // Validate order exists in cache and is not closed
652        let cache = self.core.cache();
653        let cache_borrow = cache.borrow();
654
655        let order = match cache_borrow.order(&client_order_id) {
656            Some(order) => order,
657            None => {
658                tracing::error!(
659                    "Cannot cancel order {}: not found in cache",
660                    client_order_id
661                );
662                return Ok(()); // Not an error - order may have been filled/canceled already
663            }
664        };
665
666        // Validate order is not already closed
667        if order.is_closed() {
668            tracing::warn!(
669                "CancelOrder command for {} when order already {} (will not send to exchange)",
670                client_order_id,
671                order.status()
672            );
673            return Ok(());
674        }
675
676        // Retrieve instrument from cache
677        let instrument_id = cmd.instrument_id;
678        let instrument = match cache_borrow.instrument(&instrument_id) {
679            Some(instrument) => instrument,
680            None => {
681                tracing::error!(
682                    "Cannot cancel order {}: instrument {} not found in cache",
683                    client_order_id,
684                    instrument_id
685                );
686                return Ok(()); // Not an error - missing instrument is a cache issue
687            }
688        };
689
690        tracing::debug!(
691            "Cancelling order {} for instrument {}",
692            client_order_id,
693            instrument.id()
694        );
695
696        let grpc_client = self.grpc_client.clone();
697        let wallet = self.wallet.clone();
698        let wallet_address = self.wallet_address.clone();
699        let subaccount_number = self.subaccount_number;
700        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
701        let trader_id = cmd.trader_id;
702        let strategy_id = cmd.strategy_id;
703        let venue_order_id = cmd.venue_order_id;
704
705        self.spawn_task("cancel_order", async move {
706            let wallet_guard = wallet.read().await;
707            let wallet_ref = wallet_guard
708                .as_ref()
709                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
710
711            let grpc_guard = grpc_client.read().await;
712            let submitter =
713                OrderSubmitter::new((*grpc_guard).clone(), wallet_address, subaccount_number);
714
715            // Convert client_order_id to u32 (same logic as submit_order)
716            let client_id_u32 = client_order_id.as_str().parse::<u32>().unwrap_or_else(|_| {
717                // Fallback: use hash of client_order_id
718                use std::{
719                    collections::hash_map::DefaultHasher,
720                    hash::{Hash, Hasher},
721                };
722                let mut hasher = DefaultHasher::new();
723                client_order_id.as_str().hash(&mut hasher);
724                (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32
725            });
726
727            // Attempt cancellation via submitter
728            match submitter
729                .cancel_order(wallet_ref, client_id_u32, block_height)
730                .await
731            {
732                Ok(_) => {
733                    tracing::info!("Successfully cancelled order: {}", client_order_id);
734                }
735                Err(e) => {
736                    tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
737
738                    // Generate OrderCancelRejected event
739                    let sender = get_exec_event_sender();
740                    let ts_now = UnixNanos::default();
741                    let event = OrderCancelRejected::new(
742                        trader_id,
743                        strategy_id,
744                        instrument_id,
745                        client_order_id,
746                        format!("Cancel order failed: {e:?}").into(),
747                        UUID4::new(),
748                        ts_now,
749                        ts_now,
750                        false,
751                        Some(venue_order_id),
752                        None, // account_id not available in async context
753                    );
754                    sender
755                        .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
756                        .unwrap();
757                }
758            }
759
760            Ok(())
761        });
762
763        Ok(())
764    }
765
766    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
767        if !self.is_connected() {
768            anyhow::bail!("Cannot cancel orders: not connected");
769        }
770
771        // Query all open orders from cache
772        let cache = self.core.cache().borrow();
773        let mut open_orders: Vec<_> = cache
774            .orders_open(None, None, None, None)
775            .into_iter()
776            .collect();
777
778        // Filter by instrument_id (always specified in command)
779        let instrument_id = cmd.instrument_id;
780        open_orders.retain(|order| order.instrument_id() == instrument_id);
781
782        // Filter by order_side if specified (NoOrderSide means all sides)
783        if cmd.order_side != OrderSide::NoOrderSide {
784            let order_side = cmd.order_side;
785            open_orders.retain(|order| order.order_side() == order_side);
786        }
787
788        // Split orders into short-term and long-term based on TimeInForce
789        // Short-term: IOC, FOK (expire by block height)
790        // Long-term: GTC, GTD, DAY, POST_ONLY (expire by timestamp)
791        let mut short_term_orders = Vec::new();
792        let mut long_term_orders = Vec::new();
793
794        for order in &open_orders {
795            match order.time_in_force() {
796                TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
797                TimeInForce::Gtc
798                | TimeInForce::Gtd
799                | TimeInForce::Day
800                | TimeInForce::AtTheOpen
801                | TimeInForce::AtTheClose => long_term_orders.push(order),
802            }
803        }
804
805        tracing::info!(
806            "[STUB] Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
807            open_orders.len(),
808            short_term_orders.len(),
809            long_term_orders.len(),
810            instrument_id,
811            cmd.order_side
812        );
813
814        // TODO: Implement actual cancellation when proto is generated
815        // Short-term orders: Cancel via MsgCancelOrder (immediate, best-effort)
816        // Long-term orders: Cancel via MsgCancelOrder with good_til_block_time
817        //
818        // Note: dYdX v4 requires separate blockchain transactions for each cancellation.
819        // There is no batch cancel API - each order must be cancelled individually.
820
821        Ok(())
822    }
823
824    fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
825        anyhow::bail!("Batch cancel not supported by dYdX")
826    }
827
828    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
829        Ok(())
830    }
831
832    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
833        Ok(())
834    }
835}
836
837/// Dispatches account state events to the portfolio.
838///
839/// AccountState events are routed to the Portfolio (not ExecEngine) via msgbus.
840/// This follows the pattern used by BitMEX, OKX, and other reference adapters.
841fn dispatch_account_state(state: nautilus_model::events::AccountState) {
842    use std::any::Any;
843    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
844}
845
846/// Dispatches execution reports to the execution engine.
847///
848/// This follows the standard adapter pattern where WebSocket handlers parse messages
849/// into reports, and a dispatch function sends them via the execution event system.
850/// The execution engine then handles cache lookups and event generation.
851///
852/// # Architecture
853///
854/// Per `docs/developer_guide/adapters.md`, adapters should:
855/// 1. Parse WebSocket messages into ExecutionReports in the handler
856/// 2. Dispatch reports via `get_exec_event_sender()`
857/// 3. Let the execution engine handle event generation (has cache access)
858///
859/// This pattern is used by Hyperliquid, OKX, BitMEX, and other reference adapters.
860fn dispatch_execution_report(report: ExecutionReport) {
861    let sender = get_exec_event_sender();
862    match report {
863        ExecutionReport::Order(order_report) => {
864            tracing::debug!(
865                "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
866                order_report.order_status,
867                order_report.venue_order_id,
868                order_report.client_order_id
869            );
870            let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(order_report);
871            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
872                tracing::warn!("Failed to send order status report: {e}");
873            }
874        }
875        ExecutionReport::Fill(fill_report) => {
876            tracing::debug!(
877                "Dispatching fill report: venue_order_id={}, trade_id={}",
878                fill_report.venue_order_id,
879                fill_report.trade_id
880            );
881            let exec_report = nautilus_common::messages::ExecutionReport::Fill(fill_report);
882            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
883                tracing::warn!("Failed to send fill report: {e}");
884            }
885        }
886    }
887}
888
889#[async_trait(?Send)]
890impl LiveExecutionClient for DydxExecutionClient {
891    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
892        get_exec_event_sender()
893    }
894
895    fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
896        self.core.clock().borrow()
897    }
898
899    async fn connect(&mut self) -> anyhow::Result<()> {
900        if self.connected {
901            tracing::warn!("dYdX execution client already connected");
902            return Ok(());
903        }
904
905        tracing::info!("Connecting to dYdX");
906
907        // Load instruments BEFORE WebSocket connection
908        // Per Python implementation: "instruments are used in the first account channel message"
909        tracing::debug!("Loading instruments from HTTP API");
910        self.http_client.fetch_and_cache_instruments().await?;
911        tracing::info!(
912            "Loaded {} instruments from HTTP",
913            self.http_client.instruments_cache.len()
914        );
915
916        // Populate execution client's instrument lookup maps
917        self.cache_instruments_from_http();
918
919        // Initialize wallet from config if mnemonic is provided
920        if let Some(mnemonic) = &self.config.mnemonic {
921            let wallet = Wallet::from_mnemonic(mnemonic)?;
922            *self.wallet.write().await = Some(wallet);
923            tracing::debug!("Wallet initialized");
924        }
925
926        // Connect WebSocket
927        self.ws_client.connect().await?;
928        tracing::debug!("WebSocket connected");
929
930        // Subscribe to block height updates
931        self.ws_client.subscribe_block_height().await?;
932        tracing::debug!("Subscribed to block height updates");
933
934        // Subscribe to markets for instrument data
935        self.ws_client.subscribe_markets().await?;
936        tracing::debug!("Subscribed to markets");
937
938        // Subscribe to subaccount updates if authenticated
939        if self.config.mnemonic.is_some() {
940            self.ws_client
941                .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
942                .await?;
943            tracing::debug!(
944                "Subscribed to subaccount updates: {}/{}",
945                self.wallet_address,
946                self.subaccount_number
947            );
948
949            // Spawn WebSocket message processing task following standard adapter pattern
950            // Per docs/developer_guide/adapters.md: Parse -> Dispatch -> Engine handles events
951            if let Some(mut rx) = self.ws_client.take_receiver() {
952                use crate::websocket::messages::NautilusWsMessage;
953
954                // Clone data needed for account state parsing in spawned task
955                let account_id = self.core.account_id;
956                let instruments = self.instruments.clone();
957                let oracle_prices = self.oracle_prices.clone();
958                let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
959
960                let handle = tokio::spawn(async move {
961                    while let Some(msg) = rx.recv().await {
962                        match msg {
963                            NautilusWsMessage::Order(report) => {
964                                tracing::debug!("Received order update: {:?}", report.order_status);
965                                dispatch_execution_report(ExecutionReport::Order(report));
966                            }
967                            NautilusWsMessage::Fill(report) => {
968                                tracing::debug!("Received fill update");
969                                dispatch_execution_report(ExecutionReport::Fill(report));
970                            }
971                            NautilusWsMessage::Position(report) => {
972                                tracing::debug!("Received position update");
973                                // Dispatch position status reports via execution event system
974                                let sender = get_exec_event_sender();
975                                let exec_report =
976                                    nautilus_common::messages::ExecutionReport::Position(Box::new(
977                                        *report,
978                                    ));
979                                if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
980                                    tracing::warn!("Failed to send position status report: {e}");
981                                }
982                            }
983                            NautilusWsMessage::AccountState(state) => {
984                                tracing::debug!("Received account state update");
985                                dispatch_account_state(*state);
986                            }
987                            NautilusWsMessage::SubaccountSubscribed(msg) => {
988                                tracing::debug!(
989                                    "Parsing subaccount subscription with full context"
990                                );
991
992                                // Build instruments map for parsing (clone to avoid lifetime issues)
993                                let inst_map: std::collections::HashMap<_, _> = instruments
994                                    .iter()
995                                    .map(|entry| (*entry.key(), entry.value().clone()))
996                                    .collect();
997
998                                // Build oracle prices map (copy Decimals)
999                                let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1000                                    .iter()
1001                                    .map(|entry| (*entry.key(), *entry.value()))
1002                                    .collect();
1003
1004                                let ts_init =
1005                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1006                                let ts_event = ts_init;
1007
1008                                match crate::http::parse::parse_account_state(
1009                                    &msg.contents.subaccount,
1010                                    account_id,
1011                                    &inst_map,
1012                                    &oracle_map,
1013                                    ts_event,
1014                                    ts_init,
1015                                ) {
1016                                    Ok(account_state) => {
1017                                        tracing::info!(
1018                                            "Parsed account state: {} balance(s), {} margin(s)",
1019                                            account_state.balances.len(),
1020                                            account_state.margins.len()
1021                                        );
1022                                        dispatch_account_state(account_state);
1023                                    }
1024                                    Err(e) => {
1025                                        tracing::error!("Failed to parse account state: {e}");
1026                                    }
1027                                }
1028
1029                                // Parse positions from the subscription
1030                                if let Some(ref positions) =
1031                                    msg.contents.subaccount.open_perpetual_positions
1032                                {
1033                                    tracing::debug!(
1034                                        "Parsing {} position(s) from subscription",
1035                                        positions.len()
1036                                    );
1037
1038                                    for (market, ws_position) in positions.iter() {
1039                                        match crate::websocket::parse::parse_ws_position_report(
1040                                            ws_position,
1041                                            &instruments,
1042                                            account_id,
1043                                            ts_init,
1044                                        ) {
1045                                            Ok(report) => {
1046                                                tracing::debug!(
1047                                                    "Parsed position report: {} {} {} {}",
1048                                                    report.instrument_id,
1049                                                    report.position_side,
1050                                                    report.quantity,
1051                                                    market
1052                                                );
1053                                                let sender = get_exec_event_sender();
1054                                                let exec_report =
1055                                                    nautilus_common::messages::ExecutionReport::Position(
1056                                                        Box::new(report),
1057                                                    );
1058                                                if let Err(e) =
1059                                                    sender.send(ExecutionEvent::Report(exec_report))
1060                                                {
1061                                                    tracing::warn!(
1062                                                        "Failed to send position status report: {e}"
1063                                                    );
1064                                                }
1065                                            }
1066                                            Err(e) => {
1067                                                tracing::error!(
1068                                                    "Failed to parse WebSocket position for {}: {e}",
1069                                                    market
1070                                                );
1071                                            }
1072                                        }
1073                                    }
1074                                }
1075                            }
1076                            NautilusWsMessage::SubaccountsChannelData(data) => {
1077                                tracing::debug!("Processing subaccounts channel data");
1078                                let ts_init =
1079                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1080
1081                                // Process orders
1082                                if let Some(ref orders) = data.contents.orders {
1083                                    for ws_order in orders {
1084                                        match crate::websocket::parse::parse_ws_order_report(
1085                                            ws_order,
1086                                            &clob_pair_id_to_instrument,
1087                                            &instruments,
1088                                            account_id,
1089                                            ts_init,
1090                                        ) {
1091                                            Ok(report) => {
1092                                                tracing::debug!(
1093                                                    "Parsed order report: {} {} {} @ {}",
1094                                                    report.instrument_id,
1095                                                    report.order_side,
1096                                                    report.order_status,
1097                                                    report.quantity
1098                                                );
1099                                                let sender = get_exec_event_sender();
1100                                                let exec_report =
1101                                                    nautilus_common::messages::ExecutionReport::OrderStatus(
1102                                                        Box::new(report),
1103                                                    );
1104                                                if let Err(e) =
1105                                                    sender.send(ExecutionEvent::Report(exec_report))
1106                                                {
1107                                                    tracing::warn!(
1108                                                        "Failed to send order status report: {e}"
1109                                                    );
1110                                                }
1111                                            }
1112                                            Err(e) => {
1113                                                tracing::error!(
1114                                                    "Failed to parse WebSocket order: {e}"
1115                                                );
1116                                            }
1117                                        }
1118                                    }
1119                                }
1120
1121                                // Process fills
1122                                if let Some(ref fills) = data.contents.fills {
1123                                    for ws_fill in fills {
1124                                        match crate::websocket::parse::parse_ws_fill_report(
1125                                            ws_fill,
1126                                            &instruments,
1127                                            account_id,
1128                                            ts_init,
1129                                        ) {
1130                                            Ok(report) => {
1131                                                tracing::debug!(
1132                                                    "Parsed fill report: {} {} {} @ {}",
1133                                                    report.instrument_id,
1134                                                    report.venue_order_id,
1135                                                    report.last_qty,
1136                                                    report.last_px
1137                                                );
1138                                                let sender = get_exec_event_sender();
1139                                                let exec_report =
1140                                                    nautilus_common::messages::ExecutionReport::Fill(
1141                                                        Box::new(report),
1142                                                    );
1143                                                if let Err(e) =
1144                                                    sender.send(ExecutionEvent::Report(exec_report))
1145                                                {
1146                                                    tracing::warn!(
1147                                                        "Failed to send fill report: {e}"
1148                                                    );
1149                                                }
1150                                            }
1151                                            Err(e) => {
1152                                                tracing::error!(
1153                                                    "Failed to parse WebSocket fill: {e}"
1154                                                );
1155                                            }
1156                                        }
1157                                    }
1158                                }
1159                            }
1160                            NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1161                                tracing::debug!(
1162                                    "Processing oracle price updates for {} markets",
1163                                    oracle_prices_map.len()
1164                                );
1165
1166                                // Update oracle_prices map with new prices
1167                                for (market_symbol, oracle_data) in oracle_prices_map.iter() {
1168                                    // Parse oracle price
1169                                    match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1170                                    {
1171                                        Ok(price) => {
1172                                            // Find instrument by symbol (oracle uses raw symbol like "BTC-USD")
1173                                            // Append "-PERP" to match instrument IDs
1174                                            let symbol_with_perp =
1175                                                format!("{}-PERP", market_symbol);
1176
1177                                            // Find matching instrument
1178                                            if let Some(entry) = instruments.iter().find(|entry| {
1179                                                entry.value().id().symbol.as_str()
1180                                                    == symbol_with_perp
1181                                            }) {
1182                                                let instrument_id = *entry.key();
1183                                                oracle_prices.insert(instrument_id, price);
1184                                                tracing::trace!(
1185                                                    "Updated oracle price for {}: {}",
1186                                                    instrument_id,
1187                                                    price
1188                                                );
1189                                            } else {
1190                                                tracing::debug!(
1191                                                    "No instrument found for market symbol '{}' (tried '{}')",
1192                                                    market_symbol,
1193                                                    symbol_with_perp
1194                                                );
1195                                            }
1196                                        }
1197                                        Err(e) => {
1198                                            tracing::warn!(
1199                                                "Failed to parse oracle price for {}: {}",
1200                                                market_symbol,
1201                                                e
1202                                            );
1203                                        }
1204                                    }
1205                                }
1206                            }
1207                            NautilusWsMessage::Error(err) => {
1208                                tracing::error!("WebSocket error: {:?}", err);
1209                            }
1210                            NautilusWsMessage::Reconnected => {
1211                                tracing::info!("WebSocket reconnected");
1212                            }
1213                            _ => {
1214                                // Data, Deltas are for market data, not execution
1215                            }
1216                        }
1217                    }
1218                    tracing::info!("WebSocket message processing task ended");
1219                });
1220
1221                self.ws_stream_handle = Some(handle);
1222                tracing::debug!("Spawned WebSocket message processing task");
1223            }
1224        }
1225        self.connected = true;
1226        tracing::info!(client_id = %self.core.client_id, "Connected");
1227        Ok(())
1228    }
1229
1230    async fn disconnect(&mut self) -> anyhow::Result<()> {
1231        if !self.connected {
1232            tracing::warn!("dYdX execution client not connected");
1233            return Ok(());
1234        }
1235
1236        tracing::info!("Disconnecting from dYdX");
1237
1238        // Unsubscribe from subaccount updates if authenticated
1239        if self.config.mnemonic.is_some() {
1240            let _ = self
1241                .ws_client
1242                .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1243                .await
1244                .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1245        }
1246
1247        // Unsubscribe from markets
1248        let _ = self
1249            .ws_client
1250            .unsubscribe_markets()
1251            .await
1252            .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1253
1254        // Unsubscribe from block height
1255        let _ = self
1256            .ws_client
1257            .unsubscribe_block_height()
1258            .await
1259            .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1260
1261        // Disconnect WebSocket
1262        self.ws_client.disconnect().await?;
1263
1264        // Abort WebSocket message processing task
1265        if let Some(handle) = self.ws_stream_handle.take() {
1266            handle.abort();
1267            tracing::debug!("Aborted WebSocket message processing task");
1268        }
1269
1270        // Abort any pending tasks
1271        self.abort_pending_tasks();
1272
1273        self.connected = false;
1274        tracing::info!(client_id = %self.core.client_id, "Disconnected");
1275        Ok(())
1276    }
1277
1278    async fn generate_order_status_report(
1279        &self,
1280        cmd: &GenerateOrderStatusReport,
1281    ) -> anyhow::Result<Option<OrderStatusReport>> {
1282        use anyhow::Context;
1283
1284        // Query single order from dYdX API
1285        let response = self
1286            .http_client
1287            .inner
1288            .get_orders(
1289                &self.wallet_address,
1290                self.subaccount_number,
1291                None,    // market filter
1292                Some(1), // limit to 1 result
1293            )
1294            .await
1295            .context("failed to fetch order from dYdX API")?;
1296
1297        if response.orders.is_empty() {
1298            return Ok(None);
1299        }
1300
1301        let order = &response.orders[0];
1302        let ts_init = UnixNanos::default();
1303
1304        // Get instrument by clob_pair_id
1305        let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1306            Some(inst) => inst,
1307            None => return Ok(None),
1308        };
1309
1310        // Parse to OrderStatusReport
1311        let report = crate::http::parse::parse_order_status_report(
1312            order,
1313            &instrument,
1314            self.core.account_id,
1315            ts_init,
1316        )
1317        .context("failed to parse order status report")?;
1318
1319        // Filter by client_order_id if specified
1320        if let Some(client_order_id) = cmd.client_order_id
1321            && report.client_order_id != Some(client_order_id)
1322        {
1323            return Ok(None);
1324        }
1325
1326        // Filter by venue_order_id if specified
1327        if let Some(venue_order_id) = cmd.venue_order_id
1328            && report.venue_order_id.as_str() != venue_order_id.as_str()
1329        {
1330            return Ok(None);
1331        }
1332
1333        // Filter by instrument_id if specified
1334        if let Some(instrument_id) = cmd.instrument_id
1335            && report.instrument_id != instrument_id
1336        {
1337            return Ok(None);
1338        }
1339
1340        Ok(Some(report))
1341    }
1342
1343    async fn generate_order_status_reports(
1344        &self,
1345        cmd: &GenerateOrderStatusReport,
1346    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1347        use anyhow::Context;
1348
1349        // Query orders from dYdX API
1350        let response = self
1351            .http_client
1352            .inner
1353            .get_orders(
1354                &self.wallet_address,
1355                self.subaccount_number,
1356                None, // market filter
1357                None, // limit
1358            )
1359            .await
1360            .context("failed to fetch orders from dYdX API")?;
1361
1362        let mut reports = Vec::new();
1363        let ts_init = UnixNanos::default();
1364
1365        for order in response.orders {
1366            // Get instrument by clob_pair_id using efficient lookup
1367            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1368                Some(inst) => inst,
1369                None => continue,
1370            };
1371
1372            // Filter by instrument_id if specified
1373            if let Some(filter_id) = cmd.instrument_id
1374                && instrument.id() != filter_id
1375            {
1376                continue;
1377            }
1378
1379            // Parse to OrderStatusReport
1380            match crate::http::parse::parse_order_status_report(
1381                &order,
1382                &instrument,
1383                self.core.account_id,
1384                ts_init,
1385            ) {
1386                Ok(report) => {
1387                    // Filter by client_order_id if specified
1388                    if let Some(client_order_id) = cmd.client_order_id
1389                        && report.client_order_id != Some(client_order_id)
1390                    {
1391                        continue;
1392                    }
1393
1394                    // Filter by venue_order_id if specified
1395                    if let Some(venue_order_id) = cmd.venue_order_id
1396                        && report.venue_order_id.as_str() != venue_order_id.as_str()
1397                    {
1398                        continue;
1399                    }
1400
1401                    reports.push(report);
1402                }
1403                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1404            }
1405        }
1406
1407        tracing::info!("Generated {} order status reports", reports.len());
1408        Ok(reports)
1409    }
1410
1411    async fn generate_fill_reports(
1412        &self,
1413        cmd: GenerateFillReports,
1414    ) -> anyhow::Result<Vec<FillReport>> {
1415        use anyhow::Context;
1416
1417        // Query fills from dYdX API
1418        let response = self
1419            .http_client
1420            .inner
1421            .get_fills(
1422                &self.wallet_address,
1423                self.subaccount_number,
1424                None, // market filter
1425                None, // limit
1426            )
1427            .await
1428            .context("failed to fetch fills from dYdX API")?;
1429
1430        let mut reports = Vec::new();
1431        let ts_init = UnixNanos::default();
1432
1433        for fill in response.fills {
1434            // Get instrument by market ticker using efficient lookup
1435            let instrument = match self.get_instrument_by_market(&fill.market) {
1436                Some(inst) => inst,
1437                None => {
1438                    tracing::warn!(
1439                        "Instrument for market {} not found in cache, skipping fill {}",
1440                        fill.market,
1441                        fill.id
1442                    );
1443                    continue;
1444                }
1445            };
1446
1447            // Filter by instrument_id if specified
1448            if let Some(filter_id) = cmd.instrument_id
1449                && instrument.id() != filter_id
1450            {
1451                continue;
1452            }
1453
1454            // Parse to FillReport
1455            match crate::http::parse::parse_fill_report(
1456                &fill,
1457                &instrument,
1458                self.core.account_id,
1459                ts_init,
1460            ) {
1461                Ok(report) => {
1462                    // Filter by venue_order_id if specified
1463                    if let Some(venue_order_id) = cmd.venue_order_id
1464                        && report.venue_order_id.as_str() != venue_order_id.as_str()
1465                    {
1466                        continue;
1467                    }
1468
1469                    // Filter by time range if specified
1470                    if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1471                        if report.ts_event >= start && report.ts_event <= end {
1472                            reports.push(report);
1473                        }
1474                    } else if let Some(start) = cmd.start {
1475                        if report.ts_event >= start {
1476                            reports.push(report);
1477                        }
1478                    } else if let Some(end) = cmd.end {
1479                        if report.ts_event <= end {
1480                            reports.push(report);
1481                        }
1482                    } else {
1483                        reports.push(report);
1484                    }
1485                }
1486                Err(e) => tracing::error!("Failed to parse fill report: {e}"),
1487            }
1488        }
1489
1490        tracing::info!("Generated {} fill reports", reports.len());
1491        Ok(reports)
1492    }
1493
1494    async fn generate_position_status_reports(
1495        &self,
1496        cmd: &GeneratePositionReports,
1497    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1498        use anyhow::Context;
1499
1500        // Query subaccount data from dYdX API to get positions
1501        let response = self
1502            .http_client
1503            .inner
1504            .get_subaccount(&self.wallet_address, self.subaccount_number)
1505            .await
1506            .context("failed to fetch subaccount from dYdX API")?;
1507
1508        let mut reports = Vec::new();
1509        let ts_init = UnixNanos::default();
1510
1511        // Iterate through open perpetual positions
1512        for (market_ticker, position) in &response.subaccount.open_perpetual_positions {
1513            // Get instrument by market ticker using efficient lookup
1514            let instrument = match self.get_instrument_by_market(market_ticker) {
1515                Some(inst) => inst,
1516                None => {
1517                    tracing::warn!(
1518                        "Instrument for market {} not found in cache, skipping position",
1519                        market_ticker
1520                    );
1521                    continue;
1522                }
1523            };
1524
1525            // Filter by instrument_id if specified
1526            if let Some(filter_id) = cmd.instrument_id
1527                && instrument.id() != filter_id
1528            {
1529                continue;
1530            }
1531
1532            // Parse to PositionStatusReport
1533            match crate::http::parse::parse_position_status_report(
1534                position,
1535                &instrument,
1536                self.core.account_id,
1537                ts_init,
1538            ) {
1539                Ok(report) => reports.push(report),
1540                Err(e) => {
1541                    tracing::error!("Failed to parse position status report: {e}");
1542                }
1543            }
1544        }
1545
1546        tracing::info!("Generated {} position status reports", reports.len());
1547        Ok(reports)
1548    }
1549
1550    async fn generate_mass_status(
1551        &self,
1552        lookback_mins: Option<u64>,
1553    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1554        use anyhow::Context;
1555
1556        tracing::info!(
1557            "Generating mass execution status{}",
1558            lookback_mins.map_or_else(
1559                || " (unbounded)".to_string(),
1560                |mins| format!(" (lookback: {} minutes)", mins)
1561            )
1562        );
1563
1564        // Calculate cutoff time if lookback is specified
1565        let cutoff_time =
1566            lookback_mins.map(|mins| chrono::Utc::now() - chrono::Duration::minutes(mins as i64));
1567
1568        // Query all orders
1569        let orders_response = self
1570            .http_client
1571            .inner
1572            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1573            .await
1574            .context("failed to fetch orders for mass status")?;
1575
1576        // Query subaccount for positions
1577        let subaccount_response = self
1578            .http_client
1579            .inner
1580            .get_subaccount(&self.wallet_address, self.subaccount_number)
1581            .await
1582            .context("failed to fetch subaccount for mass status")?;
1583
1584        // Query fills
1585        let fills_response = self
1586            .http_client
1587            .inner
1588            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1589            .await
1590            .context("failed to fetch fills for mass status")?;
1591
1592        let ts_init = UnixNanos::default();
1593        let mut order_reports = Vec::new();
1594        let mut position_reports = Vec::new();
1595        let mut fill_reports = Vec::new();
1596
1597        // Counters for logging (only used when filtering is active)
1598        let mut orders_filtered = 0;
1599        let mut fills_filtered = 0;
1600
1601        // Parse orders (with optional time filtering)
1602        for order in orders_response.orders {
1603            // Filter by time window if specified (use updated_at for orders)
1604            if let Some(cutoff) = cutoff_time
1605                && order.updated_at < cutoff
1606            {
1607                orders_filtered += 1;
1608                continue;
1609            }
1610
1611            if let Some(instrument) = self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1612                match crate::http::parse::parse_order_status_report(
1613                    &order,
1614                    &instrument,
1615                    self.core.account_id,
1616                    ts_init,
1617                ) {
1618                    Ok(report) => order_reports.push(report),
1619                    Err(e) => tracing::error!("Failed to parse order in mass status: {e}"),
1620                }
1621            }
1622        }
1623
1624        // Parse positions (no time filtering - positions are current state)
1625        for (market_ticker, position) in &subaccount_response.subaccount.open_perpetual_positions {
1626            if let Some(instrument) = self.get_instrument_by_market(market_ticker) {
1627                match crate::http::parse::parse_position_status_report(
1628                    position,
1629                    &instrument,
1630                    self.core.account_id,
1631                    ts_init,
1632                ) {
1633                    Ok(report) => position_reports.push(report),
1634                    Err(e) => tracing::error!("Failed to parse position in mass status: {e}"),
1635                }
1636            }
1637        }
1638
1639        // Parse fills (with optional time filtering)
1640        for fill in fills_response.fills {
1641            // Filter by time window if specified
1642            if let Some(cutoff) = cutoff_time
1643                && fill.created_at < cutoff
1644            {
1645                fills_filtered += 1;
1646                continue;
1647            }
1648
1649            if let Some(instrument) = self.get_instrument_by_market(&fill.market) {
1650                match crate::http::parse::parse_fill_report(
1651                    &fill,
1652                    &instrument,
1653                    self.core.account_id,
1654                    ts_init,
1655                ) {
1656                    Ok(report) => fill_reports.push(report),
1657                    Err(e) => tracing::error!("Failed to parse fill in mass status: {e}"),
1658                }
1659            }
1660        }
1661
1662        if cutoff_time.is_some() {
1663            tracing::info!(
1664                "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1665                order_reports.len(),
1666                orders_filtered,
1667                position_reports.len(),
1668                fill_reports.len(),
1669                fills_filtered
1670            );
1671        } else {
1672            tracing::info!(
1673                "Generated mass status: {} orders, {} positions, {} fills",
1674                order_reports.len(),
1675                position_reports.len(),
1676                fill_reports.len()
1677            );
1678        }
1679
1680        // Create mass status and add reports
1681        let mut mass_status = ExecutionMassStatus::new(
1682            self.core.client_id,
1683            self.core.account_id,
1684            self.core.venue,
1685            ts_init,
1686            None, // report_id will be auto-generated
1687        );
1688
1689        mass_status.add_order_reports(order_reports);
1690        mass_status.add_position_reports(position_reports);
1691        mass_status.add_fill_reports(fill_reports);
1692
1693        Ok(Some(mass_status))
1694    }
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699    use nautilus_model::{
1700        enums::{OrderSide, OrderType, TimeInForce},
1701        events::order::initialized::OrderInitializedBuilder,
1702        identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
1703        orders::OrderAny,
1704        types::{Price, Quantity},
1705    };
1706    use rstest::rstest;
1707
1708    use super::*;
1709
1710    /// Test that client order ID parsing to u32 works for numeric strings
1711    #[rstest]
1712    fn test_client_order_id_numeric_parsing() {
1713        let client_id = "12345";
1714        let result: Result<u32, _> = client_id.parse();
1715        assert!(result.is_ok());
1716        assert_eq!(result.unwrap(), 12345);
1717    }
1718
1719    /// Test that client order ID hashing works for non-numeric strings
1720    #[rstest]
1721    fn test_client_order_id_hash_fallback() {
1722        use std::{
1723            collections::hash_map::DefaultHasher,
1724            hash::{Hash, Hasher},
1725        };
1726
1727        let client_id = "O-20241112-ABC123-001";
1728        let parse_result: Result<u32, _> = client_id.parse();
1729        assert!(parse_result.is_err());
1730
1731        // Fallback to hash
1732        let mut hasher = DefaultHasher::new();
1733        client_id.hash(&mut hasher);
1734        let hash_result = (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32;
1735
1736        assert!(hash_result < MAX_CLIENT_ID);
1737        assert!(hash_result > 0); // Very unlikely to be 0
1738    }
1739
1740    /// Test that unsupported order types are properly rejected
1741    #[rstest]
1742    fn test_unsupported_order_type_rejection() {
1743        // Test that StopMarket is currently rejected
1744        let order_type = OrderType::StopMarket;
1745        let is_supported = matches!(order_type, OrderType::Market | OrderType::Limit);
1746        assert!(!is_supported);
1747
1748        // Test that StopLimit is currently rejected
1749        let order_type = OrderType::StopLimit;
1750        let is_supported = matches!(order_type, OrderType::Market | OrderType::Limit);
1751        assert!(!is_supported);
1752    }
1753
1754    /// Test that supported order types are accepted
1755    #[rstest]
1756    fn test_supported_order_types() {
1757        let market = OrderType::Market;
1758        assert!(matches!(market, OrderType::Market | OrderType::Limit));
1759
1760        let limit = OrderType::Limit;
1761        assert!(matches!(limit, OrderType::Market | OrderType::Limit));
1762    }
1763
1764    /// Test UnixNanos to seconds conversion for expire_time
1765    #[rstest]
1766    fn test_unix_nanos_to_seconds_conversion() {
1767        use nautilus_core::UnixNanos;
1768
1769        // Test conversion of 1 second
1770        let one_second = UnixNanos::from(1_000_000_000_u64);
1771        let seconds = (one_second.as_u64() / 1_000_000_000) as i64;
1772        assert_eq!(seconds, 1);
1773
1774        // Test conversion of 1 hour
1775        let one_hour = UnixNanos::from(3_600_000_000_000_u64);
1776        let seconds = (one_hour.as_u64() / 1_000_000_000) as i64;
1777        assert_eq!(seconds, 3600);
1778
1779        // Test current timestamp (should be reasonable)
1780        let now = UnixNanos::from(1_731_398_400_000_000_000_u64); // 2024-11-12
1781        let seconds = (now.as_u64() / 1_000_000_000) as i64;
1782        assert_eq!(seconds, 1_731_398_400);
1783    }
1784
1785    /// Test that OrderAny API methods work correctly
1786    #[rstest]
1787    fn test_order_any_api_usage() {
1788        let order = OrderInitializedBuilder::default()
1789            .trader_id(TraderId::from("TRADER-001"))
1790            .strategy_id(StrategyId::from("STRATEGY-001"))
1791            .instrument_id(InstrumentId::from("ETH-USD-PERP.DYDX"))
1792            .client_order_id(ClientOrderId::from("O-001"))
1793            .order_side(OrderSide::Buy)
1794            .order_type(OrderType::Limit)
1795            .quantity(Quantity::from("10"))
1796            .price(Some(Price::from("2000.50")))
1797            .time_in_force(TimeInForce::Gtc)
1798            .build()
1799            .unwrap();
1800
1801        let order_any: OrderAny = order.into();
1802
1803        // Test OrderAny methods
1804        assert_eq!(order_any.order_side(), OrderSide::Buy);
1805        assert_eq!(order_any.order_type(), OrderType::Limit);
1806        assert_eq!(order_any.quantity(), Quantity::from("10"));
1807        assert_eq!(order_any.price(), Some(Price::from("2000.50")));
1808        assert_eq!(order_any.time_in_force(), TimeInForce::Gtc);
1809        assert!(!order_any.is_post_only());
1810        assert!(!order_any.is_reduce_only());
1811        assert_eq!(order_any.expire_time(), None);
1812    }
1813
1814    /// Test MAX_CLIENT_ID constant is within dYdX limits
1815    #[rstest]
1816    fn test_max_client_id_limit() {
1817        // dYdX requires client IDs to be u32
1818        assert_eq!(MAX_CLIENT_ID, u32::MAX);
1819    }
1820
1821    /// Test that client order ID conversion is consistent for cancel operations
1822    #[rstest]
1823    fn test_cancel_order_id_consistency() {
1824        use std::{
1825            collections::hash_map::DefaultHasher,
1826            hash::{Hash, Hasher},
1827        };
1828
1829        let client_id_str = "O-20241112-CANCEL-001";
1830
1831        // First conversion (for submit)
1832        let mut hasher1 = DefaultHasher::new();
1833        client_id_str.hash(&mut hasher1);
1834        let id1 = (hasher1.finish() % (MAX_CLIENT_ID as u64)) as u32;
1835
1836        // Second conversion (for cancel) - should be identical
1837        let mut hasher2 = DefaultHasher::new();
1838        client_id_str.hash(&mut hasher2);
1839        let id2 = (hasher2.finish() % (MAX_CLIENT_ID as u64)) as u32;
1840
1841        assert_eq!(id1, id2, "Client ID conversion must be deterministic");
1842    }
1843
1844    /// Test clob_pair_id extraction from CryptoPerpetual raw_symbol
1845    #[rstest]
1846    fn test_clob_pair_id_extraction_from_raw_symbol() {
1847        // Simulate raw_symbol "1" -> clob_pair_id 1
1848        let raw_symbol = "1";
1849        let result: Result<u32, _> = raw_symbol.parse();
1850        assert!(result.is_ok());
1851        assert_eq!(result.unwrap(), 1);
1852
1853        // Simulate raw_symbol "42" -> clob_pair_id 42
1854        let raw_symbol = "42";
1855        let result: Result<u32, _> = raw_symbol.parse();
1856        assert!(result.is_ok());
1857        assert_eq!(result.unwrap(), 42);
1858    }
1859
1860    /// Test clob_pair_id extraction failure for invalid raw_symbol
1861    #[rstest]
1862    fn test_clob_pair_id_extraction_invalid() {
1863        // Invalid raw_symbol should fail parsing
1864        let raw_symbol = "BTC-USD";
1865        let result: Result<u32, _> = raw_symbol.parse();
1866        assert!(result.is_err());
1867
1868        // Empty raw_symbol should fail
1869        let raw_symbol = "";
1870        let result: Result<u32, _> = raw_symbol.parse();
1871        assert!(result.is_err());
1872    }
1873
1874    /// Test market ticker to instrument mapping logic
1875    #[rstest]
1876    fn test_market_ticker_parsing() {
1877        let ticker = "BTC-USD";
1878        let parts: Vec<&str> = ticker.split('-').collect();
1879        assert_eq!(parts.len(), 2);
1880        assert_eq!(parts[0], "BTC");
1881        assert_eq!(parts[1], "USD");
1882
1883        let ticker = "ETH-USD";
1884        let parts: Vec<&str> = ticker.split('-').collect();
1885        assert_eq!(parts.len(), 2);
1886        assert_eq!(parts[0], "ETH");
1887        assert_eq!(parts[1], "USD");
1888    }
1889
1890    /// Test market ticker with invalid format
1891    #[rstest]
1892    fn test_market_ticker_invalid_format() {
1893        let ticker = "BTCUSD";
1894        let parts: Vec<&str> = ticker.split('-').collect();
1895        assert_eq!(parts.len(), 1); // No separator
1896
1897        let ticker = "BTC-USD-PERP";
1898        let parts: Vec<&str> = ticker.split('-').collect();
1899        assert_eq!(parts.len(), 3); // Too many parts
1900    }
1901}