nautilus_dydx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26    messages::{
27        DataEvent, DataResponse,
28        data::{
29            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
30            RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31            SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
32            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
33            UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
34            UnsubscribeQuotes, UnsubscribeTrades,
35        },
36    },
37    runner::get_data_event_sender,
38};
39use nautilus_core::{
40    UnixNanos,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_data::client::DataClient;
44use nautilus_model::{
45    data::{
46        Bar, BarType, BookOrder, Data as NautilusData, OrderBookDelta, OrderBookDeltas_API,
47        QuoteTick,
48    },
49    enums::{BookAction, OrderSide, RecordFlag},
50    identifiers::{ClientId, InstrumentId, Venue},
51    instruments::{Instrument, InstrumentAny},
52    orderbook::OrderBook,
53    types::{Price, Quantity, price::PriceRaw},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60    common::consts::DYDX_VENUE, config::DydxDataClientConfig, http::client::DydxHttpClient,
61    websocket::client::DydxWebSocketClient,
62};
63
64/// Groups WebSocket message handling dependencies.
65struct WsMessageContext<'a> {
66    data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
67    instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
68    order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
69    last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
70    ws_client: &'a Option<DydxWebSocketClient>,
71    active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
72    active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
73    active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
74}
75
76/// dYdX data client for live market data streaming and historical data requests.
77///
78/// This client integrates with the Nautilus DataEngine to provide:
79/// - Real-time market data via WebSocket subscriptions
80/// - Historical data via REST API requests
81/// - Automatic instrument discovery and caching
82/// - Connection lifecycle management
83#[derive(Debug)]
84pub struct DydxDataClient {
85    /// The client ID for this data client.
86    client_id: ClientId,
87    /// Configuration for the data client.
88    config: DydxDataClientConfig,
89    /// HTTP client for REST API requests.
90    http_client: DydxHttpClient,
91    /// WebSocket client for real-time data streaming (optional).
92    ws_client: Option<DydxWebSocketClient>,
93    /// Whether the client is currently connected.
94    is_connected: AtomicBool,
95    /// Cancellation token for async operations.
96    cancellation_token: CancellationToken,
97    /// Background task handles.
98    tasks: Vec<JoinHandle<()>>,
99    /// Channel sender for emitting data events to the DataEngine.
100    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
101    /// Cached instruments by symbol (shared with HTTP client via `Arc<DashMap<Ustr, InstrumentAny>>`).
102    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
103    /// High-resolution clock for timestamps.
104    clock: &'static AtomicTime,
105    /// Local order books maintained for generating quotes and resolving crosses.
106    #[allow(dead_code)]
107    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
108    /// Last quote tick per instrument (used for quote generation from book deltas).
109    #[allow(dead_code)]
110    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
111    /// Incomplete bars cache (bars not yet closed, keyed by BarType).
112    #[allow(dead_code)]
113    incomplete_bars: Arc<DashMap<BarType, Bar>>,
114    /// Map WebSocket topic strings to BarType for bar subscriptions.
115    #[allow(dead_code)]
116    bar_type_mappings: Arc<DashMap<String, BarType>>,
117    /// Active orderbook subscriptions for periodic snapshot refresh.
118    active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
119    /// Active trade subscriptions for reconnection recovery.
120    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
121    /// Active bar/candle subscriptions for reconnection recovery (maps instrument+resolution to BarType).
122    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
123}
124
125impl DydxDataClient {
126    /// Maps Nautilus BarType spec to dYdX candle resolution string.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the bar aggregation or step is not supported by dYdX.
131    #[allow(dead_code)]
132    fn map_bar_spec_to_resolution(
133        spec: &nautilus_model::data::BarSpecification,
134    ) -> anyhow::Result<&'static str> {
135        use nautilus_model::enums::BarAggregation;
136
137        match spec.step.get() {
138            1 => match spec.aggregation {
139                BarAggregation::Minute => Ok("1MIN"),
140                BarAggregation::Hour => Ok("1HOUR"),
141                BarAggregation::Day => Ok("1DAY"),
142                _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143            },
144            5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145            15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146            30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147            4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148            step => anyhow::bail!(
149                "Unsupported bar step: {step} with aggregation {:?}",
150                spec.aggregation
151            ),
152        }
153    }
154
155    /// Creates a new [`DydxDataClient`] instance.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the client fails to initialize.
160    pub fn new(
161        client_id: ClientId,
162        config: DydxDataClientConfig,
163        http_client: DydxHttpClient,
164        ws_client: Option<DydxWebSocketClient>,
165    ) -> anyhow::Result<Self> {
166        let clock = get_atomic_clock_realtime();
167        let data_sender = get_data_event_sender();
168
169        // Clone the instruments cache before moving http_client
170        let instruments_cache = http_client.instruments().clone();
171
172        Ok(Self {
173            client_id,
174            config,
175            http_client,
176            ws_client,
177            is_connected: AtomicBool::new(false),
178            cancellation_token: CancellationToken::new(),
179            tasks: Vec::new(),
180            data_sender,
181            instruments: instruments_cache,
182            clock,
183            order_books: Arc::new(DashMap::new()),
184            last_quotes: Arc::new(DashMap::new()),
185            incomplete_bars: Arc::new(DashMap::new()),
186            bar_type_mappings: Arc::new(DashMap::new()),
187            active_orderbook_subs: Arc::new(DashMap::new()),
188            active_trade_subs: Arc::new(DashMap::new()),
189            active_bar_subs: Arc::new(DashMap::new()),
190        })
191    }
192
193    /// Returns the venue for this data client.
194    #[must_use]
195    pub fn venue(&self) -> Venue {
196        *DYDX_VENUE
197    }
198
199    fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200        self.ws_client
201            .as_ref()
202            .context("websocket client not initialized; call connect first")
203    }
204
205    #[allow(dead_code)]
206    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207        self.ws_client
208            .as_mut()
209            .context("websocket client not initialized; call connect first")
210    }
211
212    /// Returns `true` when the client is connected.
213    #[must_use]
214    pub fn is_connected(&self) -> bool {
215        self.is_connected.load(Ordering::Relaxed)
216    }
217
218    /// Spawns an async WebSocket task with error handling.
219    ///
220    /// This helper ensures consistent error logging across all subscription methods.
221    fn spawn_ws<F>(&self, fut: F, context: &'static str)
222    where
223        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224    {
225        tokio::spawn(async move {
226            if let Err(e) = fut.await {
227                tracing::error!("{context}: {e:?}");
228            }
229        });
230    }
231
232    /// Bootstrap instruments from the dYdX Indexer API.
233    ///
234    /// This method:
235    /// 1. Fetches all available instruments from the REST API
236    /// 2. Caches them in the HTTP client
237    /// 3. Caches them in the WebSocket client (if present)
238    /// 4. Populates the local instruments cache
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if:
243    /// - The HTTP request fails.
244    /// - Instrument parsing fails.
245    ///
246    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247        tracing::info!("Bootstrapping dYdX instruments");
248
249        // Fetch instruments from HTTP API
250        // Note: maker_fee and taker_fee can be None initially - they'll be set to zero
251        let instruments = self
252            .http_client
253            .request_instruments(None, None, None)
254            .await
255            .context("failed to load instruments from dYdX")?;
256
257        if instruments.is_empty() {
258            tracing::warn!("No dYdX instruments were loaded");
259            return Ok(instruments);
260        }
261
262        tracing::info!("Loaded {} dYdX instruments", instruments.len());
263
264        // Cache instruments in HTTP client (request_instruments does NOT cache automatically)
265        self.http_client.cache_instruments(instruments.clone());
266
267        // Cache in WebSocket client if present
268        if let Some(ref ws) = self.ws_client {
269            ws.cache_instruments(instruments.clone());
270        }
271
272        Ok(instruments)
273    }
274}
275
276// Implement DataClient trait for integration with Nautilus DataEngine
277#[async_trait::async_trait]
278impl DataClient for DydxDataClient {
279    fn client_id(&self) -> ClientId {
280        self.client_id
281    }
282
283    fn venue(&self) -> Option<Venue> {
284        Some(*DYDX_VENUE)
285    }
286
287    fn start(&mut self) -> anyhow::Result<()> {
288        tracing::info!(
289            client_id = %self.client_id,
290            is_testnet = self.http_client.is_testnet(),
291            "Starting dYdX data client"
292        );
293        Ok(())
294    }
295
296    fn stop(&mut self) -> anyhow::Result<()> {
297        tracing::info!("Stopping dYdX data client {}", self.client_id);
298        self.cancellation_token.cancel();
299        self.is_connected.store(false, Ordering::Relaxed);
300        Ok(())
301    }
302
303    fn reset(&mut self) -> anyhow::Result<()> {
304        tracing::debug!("Resetting dYdX data client {}", self.client_id);
305        self.is_connected.store(false, Ordering::Relaxed);
306        self.cancellation_token = CancellationToken::new();
307        self.tasks.clear();
308        Ok(())
309    }
310
311    fn dispose(&mut self) -> anyhow::Result<()> {
312        tracing::debug!("Disposing dYdX data client {}", self.client_id);
313        self.stop()
314    }
315
316    async fn connect(&mut self) -> anyhow::Result<()> {
317        if self.is_connected() {
318            return Ok(());
319        }
320
321        tracing::info!("Connecting dYdX data client");
322
323        // Bootstrap instruments first
324        self.bootstrap_instruments().await?;
325
326        // Connect WebSocket client and subscribe to market updates
327        if let Some(ref mut ws) = self.ws_client {
328            ws.connect()
329                .await
330                .context("failed to connect dYdX websocket")?;
331
332            ws.subscribe_markets()
333                .await
334                .context("failed to subscribe to markets channel")?;
335
336            // Start message processing task (handler already converts to NautilusWsMessage)
337            if let Some(rx) = ws.take_receiver() {
338                let data_tx = self.data_sender.clone();
339                let instruments = self.instruments.clone();
340                let order_books = self.order_books.clone();
341                let last_quotes = self.last_quotes.clone();
342                let ws_client = self.ws_client.clone();
343                let active_orderbook_subs = self.active_orderbook_subs.clone();
344                let active_trade_subs = self.active_trade_subs.clone();
345                let active_bar_subs = self.active_bar_subs.clone();
346
347                let task = tokio::spawn(async move {
348                    let mut rx = rx;
349                    while let Some(msg) = rx.recv().await {
350                        let ctx = WsMessageContext {
351                            data_sender: &data_tx,
352                            instruments: &instruments,
353                            order_books: &order_books,
354                            last_quotes: &last_quotes,
355                            ws_client: &ws_client,
356                            active_orderbook_subs: &active_orderbook_subs,
357                            active_trade_subs: &active_trade_subs,
358                            active_bar_subs: &active_bar_subs,
359                        };
360                        Self::handle_ws_message(msg, &ctx);
361                    }
362                });
363                self.tasks.push(task);
364            } else {
365                tracing::warn!("No inbound WS receiver available after connect");
366            }
367        }
368
369        // Start orderbook snapshot refresh task
370        self.start_orderbook_refresh_task()?;
371
372        // Start instrument refresh task
373        self.start_instrument_refresh_task()?;
374
375        self.is_connected.store(true, Ordering::Relaxed);
376        tracing::info!("Connected dYdX data client");
377
378        Ok(())
379    }
380
381    async fn disconnect(&mut self) -> anyhow::Result<()> {
382        if !self.is_connected() {
383            return Ok(());
384        }
385
386        tracing::info!("Disconnecting dYdX data client");
387
388        // Disconnect WebSocket client if present
389        if let Some(ref mut ws) = self.ws_client {
390            ws.disconnect()
391                .await
392                .context("failed to disconnect dYdX websocket")?;
393        }
394
395        self.is_connected.store(false, Ordering::Relaxed);
396        tracing::info!("Disconnected dYdX data client");
397
398        Ok(())
399    }
400
401    fn is_connected(&self) -> bool {
402        self.is_connected.load(Ordering::Relaxed)
403    }
404
405    fn is_disconnected(&self) -> bool {
406        !self.is_connected()
407    }
408
409    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
410        // dYdX uses a global markets channel which streams instruments implicitly.
411        // There is no dedicated instruments subscription, so this is a no-op to
412        // mirror the behaviour of `subscribe_instruments`.
413        tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
414        Ok(())
415    }
416
417    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
418        // dYdX does not support per-instrument instrument feed subscriptions.
419        // The markets channel always streams all instruments, so this is a no-op.
420        tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
421        Ok(())
422    }
423
424    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
425        // dYdX markets channel auto-subscribes to all instruments
426        // No explicit subscription needed - already handled in connect()
427        tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
428        Ok(())
429    }
430
431    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
432        // dYdX markets channel auto-subscribes to all instruments
433        // Individual instrument subscriptions not supported - full feed only
434        tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
435        Ok(())
436    }
437
438    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
439        let ws = self.ws_client()?.clone();
440        let instrument_id = cmd.instrument_id;
441
442        // Track active subscription for reconnection recovery
443        self.active_trade_subs.insert(instrument_id, ());
444
445        self.spawn_ws(
446            async move {
447                ws.subscribe_trades(instrument_id)
448                    .await
449                    .context("trade subscription")
450            },
451            "dYdX trade subscription",
452        );
453
454        Ok(())
455    }
456
457    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
458        use nautilus_model::enums::BookType;
459
460        if cmd.book_type != BookType::L2_MBP {
461            anyhow::bail!(
462                "dYdX only supports L2_MBP order book deltas, received {:?}",
463                cmd.book_type
464            );
465        }
466
467        // Ensure local order book exists for this instrument.
468        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
469
470        // Track active subscription for periodic refresh
471        self.active_orderbook_subs.insert(cmd.instrument_id, ());
472
473        let ws = self.ws_client()?.clone();
474        let instrument_id = cmd.instrument_id;
475
476        self.spawn_ws(
477            async move {
478                ws.subscribe_orderbook(instrument_id)
479                    .await
480                    .context("orderbook subscription")
481            },
482            "dYdX orderbook subscription",
483        );
484
485        Ok(())
486    }
487
488    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
489        use nautilus_model::enums::BookType;
490
491        if cmd.book_type != BookType::L2_MBP {
492            anyhow::bail!(
493                "dYdX only supports L2_MBP order book snapshots, received {:?}",
494                cmd.book_type
495            );
496        }
497
498        // Track active subscription for periodic refresh
499        self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501        let ws = self.ws_client()?.clone();
502        let instrument_id = cmd.instrument_id;
503
504        tokio::spawn(async move {
505            if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506                tracing::error!(
507                    "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508                );
509            }
510        });
511
512        Ok(())
513    }
514
515    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516        // dYdX doesn't have a dedicated quotes channel
517        // Quotes are synthesized from order book deltas
518        tracing::debug!(
519            "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520            cmd.instrument_id
521        );
522
523        // Simply delegate to book deltas subscription
524        use nautilus_common::messages::data::SubscribeBookDeltas;
525        use nautilus_model::enums::BookType;
526
527        let book_cmd = SubscribeBookDeltas {
528            client_id: cmd.client_id,
529            venue: cmd.venue,
530            instrument_id: cmd.instrument_id,
531            book_type: BookType::L2_MBP,
532            depth: None,
533            managed: false,
534            params: None,
535            command_id: cmd.command_id,
536            ts_init: cmd.ts_init,
537        };
538
539        self.subscribe_book_deltas(&book_cmd)
540    }
541
542    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
543        let ws = self.ws_client()?.clone();
544        let instrument_id = cmd.bar_type.instrument_id();
545        let spec = cmd.bar_type.spec();
546
547        // Map BarType spec to dYdX candle resolution string
548        let resolution = match spec.step.get() {
549            1 => match spec.aggregation {
550                nautilus_model::enums::BarAggregation::Minute => "1MIN",
551                nautilus_model::enums::BarAggregation::Hour => "1HOUR",
552                nautilus_model::enums::BarAggregation::Day => "1DAY",
553                _ => {
554                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
555                }
556            },
557            5 => {
558                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
559                    "5MINS"
560                } else {
561                    anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
562                }
563            }
564            15 => {
565                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
566                    "15MINS"
567                } else {
568                    anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
569                }
570            }
571            30 => {
572                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
573                    "30MINS"
574                } else {
575                    anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
576                }
577            }
578            4 => {
579                if spec.aggregation == nautilus_model::enums::BarAggregation::Hour {
580                    "4HOURS"
581                } else {
582                    anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
583                }
584            }
585            step => {
586                anyhow::bail!("Unsupported bar step: {step}");
587            }
588        };
589
590        // Track active subscription for reconnection recovery
591        let bar_type = cmd.bar_type;
592        self.active_bar_subs
593            .insert((instrument_id, resolution.to_string()), bar_type);
594
595        self.spawn_ws(
596            async move {
597                // Register bar type BEFORE subscribing to avoid race condition
598                let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
599                let topic = format!("{ticker}/{resolution}");
600                if let Err(e) =
601                    ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
602                        topic,
603                        bar_type,
604                    })
605                {
606                    anyhow::bail!("Failed to register bar type: {e}");
607                }
608
609                // Delay to ensure handler processes registration before candle messages arrive
610                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
611
612                ws.subscribe_candles(instrument_id, resolution)
613                    .await
614                    .context("candles subscription")
615            },
616            "dYdX candles subscription",
617        );
618
619        Ok(())
620    }
621
622    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
623        // Remove from active subscription tracking
624        self.active_trade_subs.remove(&cmd.instrument_id);
625
626        let ws = self.ws_client()?.clone();
627        let instrument_id = cmd.instrument_id;
628
629        self.spawn_ws(
630            async move {
631                ws.unsubscribe_trades(instrument_id)
632                    .await
633                    .context("trade unsubscription")
634            },
635            "dYdX trade unsubscription",
636        );
637
638        Ok(())
639    }
640
641    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
642        // Remove from active subscription tracking
643        self.active_orderbook_subs.remove(&cmd.instrument_id);
644
645        let ws = self.ws_client()?.clone();
646        let instrument_id = cmd.instrument_id;
647
648        self.spawn_ws(
649            async move {
650                ws.unsubscribe_orderbook(instrument_id)
651                    .await
652                    .context("orderbook unsubscription")
653            },
654            "dYdX orderbook unsubscription",
655        );
656
657        Ok(())
658    }
659
660    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
661        // dYdX orderbook channel provides both snapshots and deltas.
662        // Unsubscribing snapshots uses the same underlying channel as deltas.
663        // Remove from active subscription tracking
664        self.active_orderbook_subs.remove(&cmd.instrument_id);
665
666        let ws = self.ws_client()?.clone();
667        let instrument_id = cmd.instrument_id;
668
669        self.spawn_ws(
670            async move {
671                ws.unsubscribe_orderbook(instrument_id)
672                    .await
673                    .context("orderbook snapshot unsubscription")
674            },
675            "dYdX orderbook snapshot unsubscription",
676        );
677
678        Ok(())
679    }
680
681    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
682        // dYdX doesn't have a dedicated quotes channel; quotes are derived from book deltas.
683        tracing::debug!(
684            "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
685            cmd.instrument_id
686        );
687
688        let book_cmd = UnsubscribeBookDeltas {
689            instrument_id: cmd.instrument_id,
690            client_id: cmd.client_id,
691            venue: cmd.venue,
692            command_id: cmd.command_id,
693            ts_init: cmd.ts_init,
694            params: cmd.params.clone(),
695        };
696
697        self.unsubscribe_book_deltas(&book_cmd)
698    }
699
700    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
701        let ws = self.ws_client()?.clone();
702        let instrument_id = cmd.bar_type.instrument_id();
703        let spec = cmd.bar_type.spec();
704
705        // Map BarType spec to dYdX candle resolution string
706        let resolution = match spec.step.get() {
707            1 => match spec.aggregation {
708                nautilus_model::enums::BarAggregation::Minute => "1MIN",
709                nautilus_model::enums::BarAggregation::Hour => "1HOUR",
710                nautilus_model::enums::BarAggregation::Day => "1DAY",
711                _ => {
712                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
713                }
714            },
715            5 => {
716                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
717                    "5MINS"
718                } else {
719                    anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
720                }
721            }
722            15 => {
723                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
724                    "15MINS"
725                } else {
726                    anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
727                }
728            }
729            30 => {
730                if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
731                    "30MINS"
732                } else {
733                    anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
734                }
735            }
736            4 => {
737                if spec.aggregation == nautilus_model::enums::BarAggregation::Hour {
738                    "4HOURS"
739                } else {
740                    anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
741                }
742            }
743            step => {
744                anyhow::bail!("Unsupported bar step: {step}");
745            }
746        };
747
748        // Remove from active subscription tracking
749        self.active_bar_subs
750            .remove(&(instrument_id, resolution.to_string()));
751
752        // Unregister bar type from handler
753        let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
754        let topic = format!("{ticker}/{resolution}");
755        if let Err(e) =
756            ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
757        {
758            tracing::warn!("Failed to unregister bar type: {e}");
759        }
760
761        self.spawn_ws(
762            async move {
763                ws.unsubscribe_candles(instrument_id, resolution)
764                    .await
765                    .context("candles unsubscription")
766            },
767            "dYdX candles unsubscription",
768        );
769
770        Ok(())
771    }
772
773    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
774        let instruments_cache = self.instruments.clone();
775        let sender = self.data_sender.clone();
776        let http = self.http_client.clone();
777        let instrument_id = request.instrument_id;
778        let request_id = request.request_id;
779        let client_id = request.client_id.unwrap_or(self.client_id);
780        let start = request.start;
781        let end = request.end;
782        let params = request.params.clone();
783        let clock = self.clock;
784        let start_nanos = datetime_to_unix_nanos(start);
785        let end_nanos = datetime_to_unix_nanos(end);
786
787        tokio::spawn(async move {
788            // First try to get from cache
789            let symbol = Ustr::from(instrument_id.symbol.as_str());
790            let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
791                tracing::debug!("Found instrument {instrument_id} in cache");
792                Some(cached.clone())
793            } else {
794                // Not in cache, fetch from API
795                tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
796                match http.request_instruments(None, None, None).await {
797                    Ok(instruments) => {
798                        // Cache all fetched instruments
799                        for inst in &instruments {
800                            upsert_instrument(&instruments_cache, inst.clone());
801                        }
802                        // Find the requested instrument
803                        instruments.into_iter().find(|i| i.id() == instrument_id)
804                    }
805                    Err(e) => {
806                        tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
807                        None
808                    }
809                }
810            };
811
812            if let Some(inst) = instrument {
813                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
814                    request_id,
815                    client_id,
816                    instrument_id,
817                    inst,
818                    start_nanos,
819                    end_nanos,
820                    clock.get_time_ns(),
821                    params,
822                )));
823
824                if let Err(e) = sender.send(DataEvent::Response(response)) {
825                    tracing::error!("Failed to send instrument response: {e}");
826                }
827            } else {
828                tracing::error!("Instrument {instrument_id} not found");
829            }
830        });
831
832        Ok(())
833    }
834
835    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
836        let http = self.http_client.clone();
837        let sender = self.data_sender.clone();
838        let instruments_cache = self.instruments.clone();
839        let request_id = request.request_id;
840        let client_id = request.client_id.unwrap_or(self.client_id);
841        let venue = self.venue();
842        let start = request.start;
843        let end = request.end;
844        let params = request.params.clone();
845        let clock = self.clock;
846        let start_nanos = datetime_to_unix_nanos(start);
847        let end_nanos = datetime_to_unix_nanos(end);
848
849        tokio::spawn(async move {
850            match http.request_instruments(None, None, None).await {
851                Ok(instruments) => {
852                    tracing::info!("Fetched {} instruments from dYdX", instruments.len());
853
854                    // Cache all instruments
855                    for instrument in &instruments {
856                        upsert_instrument(&instruments_cache, instrument.clone());
857                    }
858
859                    let response = DataResponse::Instruments(InstrumentsResponse::new(
860                        request_id,
861                        client_id,
862                        venue,
863                        instruments,
864                        start_nanos,
865                        end_nanos,
866                        clock.get_time_ns(),
867                        params,
868                    ));
869
870                    if let Err(e) = sender.send(DataEvent::Response(response)) {
871                        tracing::error!("Failed to send instruments response: {e}");
872                    }
873                }
874                Err(e) => {
875                    tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
876
877                    // Send empty response on error
878                    let response = DataResponse::Instruments(InstrumentsResponse::new(
879                        request_id,
880                        client_id,
881                        venue,
882                        Vec::new(),
883                        start_nanos,
884                        end_nanos,
885                        clock.get_time_ns(),
886                        params,
887                    ));
888
889                    if let Err(e) = sender.send(DataEvent::Response(response)) {
890                        tracing::error!("Failed to send empty instruments response: {e}");
891                    }
892                }
893            }
894        });
895
896        Ok(())
897    }
898
899    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
900        use nautilus_model::{
901            data::TradeTick,
902            enums::{AggressorSide, OrderSide},
903            identifiers::TradeId,
904        };
905
906        let http = self.http_client.clone();
907        let instruments = self.instruments.clone();
908        let sender = self.data_sender.clone();
909        let instrument_id = request.instrument_id;
910        let start = request.start;
911        let end = request.end;
912        let limit = request.limit.map(|n| n.get() as u32);
913        let request_id = request.request_id;
914        let client_id = request.client_id.unwrap_or(self.client_id);
915        let params = request.params.clone();
916        let clock = self.clock;
917        let start_nanos = datetime_to_unix_nanos(start);
918        let end_nanos = datetime_to_unix_nanos(end);
919
920        tokio::spawn(async move {
921            // dYdX Indexer trades endpoint supports `limit` but not an explicit
922            // date range in this client; we approximate by using the provided
923            // limit and instrument metadata for precision.
924            let ticker = instrument_id
925                .symbol
926                .as_str()
927                .trim_end_matches("-PERP")
928                .to_string();
929
930            // Look up instrument to derive price and size precision.
931            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
932                Some(inst) => inst.clone(),
933                None => {
934                    tracing::error!(
935                        "request_trades: instrument {} not found in cache; cannot convert trades",
936                        instrument_id
937                    );
938                    let ts_now = clock.get_time_ns();
939                    let response = DataResponse::Trades(TradesResponse::new(
940                        request_id,
941                        client_id,
942                        instrument_id,
943                        Vec::new(),
944                        start_nanos,
945                        end_nanos,
946                        ts_now,
947                        params,
948                    ));
949                    if let Err(e) = sender.send(DataEvent::Response(response)) {
950                        tracing::error!("Failed to send empty trades response: {e}");
951                    }
952                    return;
953                }
954            };
955
956            let price_precision = instrument.price_precision();
957            let size_precision = instrument.size_precision();
958
959            match http
960                .inner
961                .get_trades(&ticker, limit)
962                .await
963                .context("failed to request trades from dYdX")
964            {
965                Ok(trades_response) => {
966                    let mut ticks = Vec::new();
967
968                    for trade in trades_response.trades {
969                        let aggressor_side = match trade.side {
970                            OrderSide::Buy => AggressorSide::Buyer,
971                            OrderSide::Sell => AggressorSide::Seller,
972                            _ => continue, // Skip unsupported side
973                        };
974
975                        let price = match Price::from_decimal_dp(trade.price, price_precision) {
976                            Ok(p) => p,
977                            Err(e) => {
978                                tracing::warn!(
979                                    "request_trades: failed to convert price for trade {}: {e}",
980                                    trade.id
981                                );
982                                continue;
983                            }
984                        };
985
986                        let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
987                            Ok(q) => q,
988                            Err(e) => {
989                                tracing::warn!(
990                                    "request_trades: failed to convert size for trade {}: {e}",
991                                    trade.id
992                                );
993                                continue;
994                            }
995                        };
996
997                        let ts_event = match trade.created_at.timestamp_nanos_opt() {
998                            Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
999                            _ => {
1000                                tracing::warn!(
1001                                    "request_trades: timestamp out of range for trade {}",
1002                                    trade.id
1003                                );
1004                                continue;
1005                            }
1006                        };
1007
1008                        // Apply optional time-range filter.
1009                        if let Some(start_ts) = start_nanos
1010                            && ts_event < start_ts
1011                        {
1012                            continue;
1013                        }
1014                        if let Some(end_ts) = end_nanos
1015                            && ts_event > end_ts
1016                        {
1017                            continue;
1018                        }
1019
1020                        let tick = TradeTick::new(
1021                            instrument_id,
1022                            price,
1023                            size,
1024                            aggressor_side,
1025                            TradeId::new(&trade.id),
1026                            ts_event,
1027                            clock.get_time_ns(),
1028                        );
1029                        ticks.push(tick);
1030                    }
1031
1032                    let response = DataResponse::Trades(TradesResponse::new(
1033                        request_id,
1034                        client_id,
1035                        instrument_id,
1036                        ticks,
1037                        start_nanos,
1038                        end_nanos,
1039                        clock.get_time_ns(),
1040                        params,
1041                    ));
1042
1043                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1044                        tracing::error!("Failed to send trades response: {e}");
1045                    }
1046                }
1047                Err(e) => {
1048                    tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1049
1050                    let response = DataResponse::Trades(TradesResponse::new(
1051                        request_id,
1052                        client_id,
1053                        instrument_id,
1054                        Vec::new(),
1055                        start_nanos,
1056                        end_nanos,
1057                        clock.get_time_ns(),
1058                        params,
1059                    ));
1060
1061                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1062                        tracing::error!("Failed to send empty trades response: {e}");
1063                    }
1064                }
1065            }
1066        });
1067
1068        Ok(())
1069    }
1070
1071    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1072        use chrono::Duration;
1073        use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1074
1075        const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1076
1077        let bar_type = request.bar_type;
1078        let spec = bar_type.spec();
1079
1080        // Validate bar type requirements
1081        if bar_type.aggregation_source() != AggregationSource::External {
1082            anyhow::bail!(
1083                "dYdX only supports EXTERNAL aggregation, got {:?}",
1084                bar_type.aggregation_source()
1085            );
1086        }
1087
1088        if spec.price_type != PriceType::Last {
1089            anyhow::bail!(
1090                "dYdX only supports LAST price type, got {:?}",
1091                spec.price_type
1092            );
1093        }
1094
1095        // Map BarType spec to dYdX resolution
1096        let resolution = match spec.step.get() {
1097            1 => match spec.aggregation {
1098                BarAggregation::Minute => "1MIN",
1099                BarAggregation::Hour => "1HOUR",
1100                BarAggregation::Day => "1DAY",
1101                _ => {
1102                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1103                }
1104            },
1105            5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1106            15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1107            30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1108            4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1109            step => {
1110                anyhow::bail!("Unsupported bar step: {step}");
1111            }
1112        };
1113
1114        let http = self.http_client.clone();
1115        let instruments = self.instruments.clone();
1116        let sender = self.data_sender.clone();
1117        let instrument_id = bar_type.instrument_id();
1118        // dYdX ticker does not include the "-PERP" suffix.
1119        let symbol = instrument_id
1120            .symbol
1121            .as_str()
1122            .trim_end_matches("-PERP")
1123            .to_string();
1124        let request_id = request.request_id;
1125        let client_id = request.client_id.unwrap_or(self.client_id);
1126        let params = request.params.clone();
1127        let clock = self.clock;
1128
1129        let start = request.start;
1130        let end = request.end;
1131        let overall_limit = request.limit.map(|n| n.get() as u32);
1132
1133        // Convert optional datetimes to UnixNanos for response metadata
1134        let start_nanos = datetime_to_unix_nanos(start);
1135        let end_nanos = datetime_to_unix_nanos(end);
1136
1137        // Parse resolution string to DydxCandleResolution enum
1138        let resolution_enum = match resolution {
1139            "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1140            "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1141            "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1142            "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1143            "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1144            "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1145            "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1146            _ => {
1147                anyhow::bail!("Unsupported resolution: {resolution}");
1148            }
1149        };
1150
1151        tokio::spawn(async move {
1152            // Determine bar duration in seconds.
1153            let bar_secs: i64 = match spec.aggregation {
1154                BarAggregation::Minute => spec.step.get() as i64 * 60,
1155                BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1156                BarAggregation::Day => spec.step.get() as i64 * 86_400,
1157                _ => {
1158                    tracing::error!(
1159                        "Unsupported aggregation for request_bars: {:?}",
1160                        spec.aggregation
1161                    );
1162                    return;
1163                }
1164            };
1165
1166            // Look up instrument to derive price and size precision.
1167            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1168                Some(inst) => inst.clone(),
1169                None => {
1170                    tracing::error!(
1171                        "request_bars: instrument {} not found in cache; cannot convert candles",
1172                        instrument_id
1173                    );
1174                    let ts_now = clock.get_time_ns();
1175                    let response = DataResponse::Bars(BarsResponse::new(
1176                        request_id,
1177                        client_id,
1178                        bar_type,
1179                        Vec::new(),
1180                        start_nanos,
1181                        end_nanos,
1182                        ts_now,
1183                        params,
1184                    ));
1185                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1186                        tracing::error!("Failed to send empty bars response: {e}");
1187                    }
1188                    return;
1189                }
1190            };
1191
1192            let price_precision = instrument.price_precision();
1193            let size_precision = instrument.size_precision();
1194
1195            let mut all_bars: Vec<Bar> = Vec::new();
1196
1197            // If no explicit date range, fall back to a single request using only `limit`.
1198            let (range_start, range_end) = match (start, end) {
1199                (Some(s), Some(e)) if e > s => (s, e),
1200                _ => {
1201                    let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1202                    match http
1203                        .inner
1204                        .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1205                        .await
1206                    {
1207                        Ok(candles_response) => {
1208                            tracing::debug!(
1209                                "request_bars fetched {} candles without explicit date range",
1210                                candles_response.candles.len()
1211                            );
1212
1213                            for candle in &candles_response.candles {
1214                                match Self::candle_to_bar(
1215                                    candle,
1216                                    bar_type,
1217                                    price_precision,
1218                                    size_precision,
1219                                    bar_secs,
1220                                    clock,
1221                                ) {
1222                                    Ok(bar) => all_bars.push(bar),
1223                                    Err(e) => {
1224                                        tracing::warn!(
1225                                            "Failed to convert dYdX candle to bar for {}: {e}",
1226                                            instrument_id
1227                                        );
1228                                    }
1229                                }
1230                            }
1231
1232                            let current_time_ns = clock.get_time_ns();
1233                            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1234
1235                            let response = DataResponse::Bars(BarsResponse::new(
1236                                request_id,
1237                                client_id,
1238                                bar_type,
1239                                all_bars,
1240                                start_nanos,
1241                                end_nanos,
1242                                current_time_ns,
1243                                params,
1244                            ));
1245
1246                            if let Err(e) = sender.send(DataEvent::Response(response)) {
1247                                tracing::error!("Failed to send bars response: {e}");
1248                            }
1249                        }
1250                        Err(e) => {
1251                            tracing::error!(
1252                                "Failed to request candles for {symbol} without date range: {e:?}"
1253                            );
1254                        }
1255                    }
1256                    return;
1257                }
1258            };
1259
1260            // Calculate expected bars for the range.
1261            let total_secs = (range_end - range_start).num_seconds().max(0);
1262            let expected_bars = (total_secs / bar_secs).max(1) as u64;
1263
1264            tracing::debug!(
1265                "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1266                range_start,
1267                range_end,
1268                expected_bars
1269            );
1270
1271            let mut remaining = overall_limit.unwrap_or(u32::MAX);
1272
1273            // Determine chunk duration using max bars per request.
1274            let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1275            let chunk_duration = Duration::seconds(bar_secs * bars_per_call as i64);
1276
1277            let mut chunk_start = range_start;
1278
1279            while chunk_start < range_end && remaining > 0 {
1280                let mut chunk_end = chunk_start + chunk_duration;
1281                if chunk_end > range_end {
1282                    chunk_end = range_end;
1283                }
1284
1285                let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1286
1287                tracing::debug!(
1288                    "request_bars chunk: {} -> {}, limit={}",
1289                    chunk_start,
1290                    chunk_end,
1291                    per_call_limit
1292                );
1293
1294                match http
1295                    .inner
1296                    .get_candles(
1297                        &symbol,
1298                        resolution_enum,
1299                        Some(per_call_limit),
1300                        Some(chunk_start),
1301                        Some(chunk_end),
1302                    )
1303                    .await
1304                {
1305                    Ok(candles_response) => {
1306                        let count = candles_response.candles.len() as u32;
1307
1308                        if count == 0 {
1309                            // No more data available; stop early.
1310                            break;
1311                        }
1312
1313                        // Convert candles to bars and accumulate.
1314                        for candle in &candles_response.candles {
1315                            match Self::candle_to_bar(
1316                                candle,
1317                                bar_type,
1318                                price_precision,
1319                                size_precision,
1320                                bar_secs,
1321                                clock,
1322                            ) {
1323                                Ok(bar) => all_bars.push(bar),
1324                                Err(e) => {
1325                                    tracing::warn!(
1326                                        "Failed to convert dYdX candle to bar for {}: {e}",
1327                                        instrument_id
1328                                    );
1329                                }
1330                            }
1331                        }
1332
1333                        if remaining <= count {
1334                            break;
1335                        } else {
1336                            remaining -= count;
1337                        }
1338                    }
1339                    Err(e) => {
1340                        tracing::error!(
1341                            "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1342                            chunk_start,
1343                            chunk_end
1344                        );
1345                        break;
1346                    }
1347                }
1348
1349                chunk_start += chunk_duration;
1350            }
1351
1352            tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1353
1354            // Filter incomplete bars: only return bars where ts_event < current_time_ns
1355            let current_time_ns = clock.get_time_ns();
1356            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1357
1358            tracing::debug!(
1359                "request_bars filtered to {} completed bars (current_time_ns={})",
1360                all_bars.len(),
1361                current_time_ns
1362            );
1363
1364            let response = DataResponse::Bars(BarsResponse::new(
1365                request_id,
1366                client_id,
1367                bar_type,
1368                all_bars,
1369                start_nanos,
1370                end_nanos,
1371                current_time_ns,
1372                params,
1373            ));
1374
1375            if let Err(e) = sender.send(DataEvent::Response(response)) {
1376                tracing::error!("Failed to send bars response: {e}");
1377            }
1378        });
1379
1380        Ok(())
1381    }
1382}
1383
1384/// Upserts an instrument into the shared cache.
1385fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1386    let symbol = Ustr::from(instrument.id().symbol.as_str());
1387    cache.insert(symbol, instrument);
1388}
1389
1390/// Convert optional DateTime to optional UnixNanos timestamp.
1391fn datetime_to_unix_nanos(value: Option<chrono::DateTime<chrono::Utc>>) -> Option<UnixNanos> {
1392    value
1393        .and_then(|dt| dt.timestamp_nanos_opt())
1394        .and_then(|nanos| u64::try_from(nanos).ok())
1395        .map(UnixNanos::from)
1396}
1397
1398impl DydxDataClient {
1399    /// Start a task to periodically refresh instruments.
1400    ///
1401    /// This task runs in the background and updates the instrument cache
1402    /// at the configured interval.
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns an error if a refresh task is already running.
1407    pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1408        let interval_secs = match self.config.instrument_refresh_interval_secs {
1409            Some(secs) if secs > 0 => secs,
1410            _ => {
1411                tracing::info!("Instrument refresh disabled (interval not configured)");
1412                return Ok(());
1413            }
1414        };
1415
1416        let interval = Duration::from_secs(interval_secs);
1417        let http_client = self.http_client.clone();
1418        let instruments_cache = self.instruments.clone();
1419        let cancellation_token = self.cancellation_token.clone();
1420
1421        tracing::info!(
1422            "Starting instrument refresh task (interval: {}s)",
1423            interval_secs
1424        );
1425
1426        let task = tokio::spawn(async move {
1427            let mut interval_timer = tokio::time::interval(interval);
1428            interval_timer.tick().await; // Skip first immediate tick
1429
1430            loop {
1431                tokio::select! {
1432                    _ = cancellation_token.cancelled() => {
1433                        tracing::info!("Instrument refresh task cancelled");
1434                        break;
1435                    }
1436                    _ = interval_timer.tick() => {
1437                        tracing::debug!("Refreshing instruments");
1438
1439                        match http_client.request_instruments(None, None, None).await {
1440                            Ok(instruments) => {
1441                                tracing::debug!("Refreshed {} instruments", instruments.len());
1442
1443                                // Update local cache with refreshed instruments
1444                                for instrument in instruments {
1445                                    upsert_instrument(&instruments_cache, instrument);
1446                                }
1447
1448                                // Also update HTTP client cache via cache_instruments method
1449                                let all_instruments: Vec<_> = instruments_cache
1450                                    .iter()
1451                                    .map(|entry| entry.value().clone())
1452                                    .collect();
1453                                http_client.cache_instruments(all_instruments);
1454                            }
1455                            Err(e) => {
1456                                tracing::error!("Failed to refresh instruments: {}", e);
1457                            }
1458                        }
1459                    }
1460                }
1461            }
1462        });
1463
1464        self.tasks.push(task);
1465        Ok(())
1466    }
1467
1468    /// Start a background task to periodically refresh orderbook snapshots.
1469    ///
1470    /// This prevents stale orderbooks from missed WebSocket messages due to:
1471    /// - Network issues or message drops
1472    /// - dYdX validator delays
1473    /// - WebSocket reconnection gaps
1474    ///
1475    /// The task fetches fresh snapshots via HTTP at the configured interval
1476    /// and applies them to the local orderbooks.
1477    fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1478        let interval_secs = match self.config.orderbook_refresh_interval_secs {
1479            Some(secs) if secs > 0 => secs,
1480            _ => {
1481                tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1482                return Ok(());
1483            }
1484        };
1485
1486        let interval = Duration::from_secs(interval_secs);
1487        let http_client = self.http_client.clone();
1488        let instruments = self.instruments.clone();
1489        let order_books = self.order_books.clone();
1490        let active_subs = self.active_orderbook_subs.clone();
1491        let cancellation_token = self.cancellation_token.clone();
1492        let data_sender = self.data_sender.clone();
1493
1494        tracing::info!(
1495            "Starting orderbook snapshot refresh task (interval: {}s)",
1496            interval_secs
1497        );
1498
1499        let task = tokio::spawn(async move {
1500            let mut interval_timer = tokio::time::interval(interval);
1501            interval_timer.tick().await; // Skip first immediate tick
1502
1503            loop {
1504                tokio::select! {
1505                    _ = cancellation_token.cancelled() => {
1506                        tracing::info!("Orderbook refresh task cancelled");
1507                        break;
1508                    }
1509                    _ = interval_timer.tick() => {
1510                        let active_instruments: Vec<InstrumentId> = active_subs
1511                            .iter()
1512                            .map(|entry| *entry.key())
1513                            .collect();
1514
1515                        if active_instruments.is_empty() {
1516                            tracing::debug!("No active orderbook subscriptions to refresh");
1517                            continue;
1518                        }
1519
1520                        tracing::debug!(
1521                            "Refreshing {} orderbook snapshots",
1522                            active_instruments.len()
1523                        );
1524
1525                        for instrument_id in active_instruments {
1526                            // Get instrument for parsing
1527                            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1528                                Some(inst) => inst.clone(),
1529                                None => {
1530                                    tracing::warn!(
1531                                        "Cannot refresh orderbook: no instrument for {}",
1532                                        instrument_id
1533                                    );
1534                                    continue;
1535                                }
1536                            };
1537
1538                            // Fetch snapshot via HTTP (strip -PERP suffix for dYdX API)
1539                            let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1540                            let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1541
1542                            let snapshot = match snapshot_result {
1543                                Ok(s) => s,
1544                                Err(e) => {
1545                                    tracing::error!(
1546                                        "Failed to fetch orderbook snapshot for {}: {}",
1547                                        instrument_id,
1548                                        e
1549                                    );
1550                                    continue;
1551                                }
1552                            };
1553
1554                            // Convert HTTP snapshot to OrderBookDeltas
1555                            let deltas_result = Self::parse_orderbook_snapshot(
1556                                instrument_id,
1557                                &snapshot,
1558                                &instrument,
1559                            );
1560
1561                            let deltas = match deltas_result {
1562                                Ok(d) => d,
1563                                Err(e) => {
1564                                    tracing::error!(
1565                                        "Failed to parse orderbook snapshot for {}: {}",
1566                                        instrument_id,
1567                                        e
1568                                    );
1569                                    continue;
1570                                }
1571                            };
1572
1573                            // Apply snapshot to local orderbook
1574                            if let Some(mut book) = order_books.get_mut(&instrument_id) {
1575                                if let Err(e) = book.apply_deltas(&deltas) {
1576                                    tracing::error!(
1577                                        "Failed to apply orderbook snapshot for {}: {}",
1578                                        instrument_id,
1579                                        e
1580                                    );
1581                                    continue;
1582                                }
1583
1584                                tracing::debug!(
1585                                    "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1586                                    instrument_id,
1587                                    book.best_bid_price(),
1588                                    book.best_ask_price()
1589                                );
1590                            }
1591
1592                            // Emit the snapshot deltas
1593                            use nautilus_model::data::OrderBookDeltas_API;
1594                            let data = nautilus_model::data::Data::from(OrderBookDeltas_API::new(deltas));
1595                            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1596                                tracing::error!("Failed to emit orderbook snapshot: {}", e);
1597                            }
1598                        }
1599                    }
1600                }
1601            }
1602        });
1603
1604        self.tasks.push(task);
1605        Ok(())
1606    }
1607
1608    /// Parse HTTP orderbook snapshot into OrderBookDeltas.
1609    ///
1610    /// Converts the REST API orderbook format into Nautilus deltas with CLEAR + ADD actions.
1611    fn parse_orderbook_snapshot(
1612        instrument_id: InstrumentId,
1613        snapshot: &crate::http::models::OrderbookResponse,
1614        instrument: &InstrumentAny,
1615    ) -> anyhow::Result<nautilus_model::data::OrderBookDeltas> {
1616        use nautilus_model::{
1617            data::{BookOrder, OrderBookDelta},
1618            enums::{BookAction, OrderSide, RecordFlag},
1619            instruments::Instrument,
1620            types::{Price, Quantity},
1621        };
1622
1623        let ts_init = get_atomic_clock_realtime().get_time_ns();
1624        let mut deltas = Vec::new();
1625
1626        // Add clear delta first
1627        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1628
1629        let price_precision = instrument.price_precision();
1630        let size_precision = instrument.size_precision();
1631
1632        let bids_len = snapshot.bids.len();
1633        let asks_len = snapshot.asks.len();
1634
1635        // Add bid levels
1636        for (idx, bid) in snapshot.bids.iter().enumerate() {
1637            let is_last = idx == bids_len - 1 && asks_len == 0;
1638            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1639
1640            let price = Price::from_decimal_dp(bid.price, price_precision)
1641                .context("failed to parse bid price")?;
1642            let size = Quantity::from_decimal_dp(bid.size, size_precision)
1643                .context("failed to parse bid size")?;
1644
1645            let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1646            deltas.push(OrderBookDelta::new(
1647                instrument_id,
1648                BookAction::Add,
1649                order,
1650                flags,
1651                0,
1652                ts_init,
1653                ts_init,
1654            ));
1655        }
1656
1657        // Add ask levels
1658        for (idx, ask) in snapshot.asks.iter().enumerate() {
1659            let is_last = idx == asks_len - 1;
1660            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1661
1662            let price = Price::from_decimal_dp(ask.price, price_precision)
1663                .context("failed to parse ask price")?;
1664            let size = Quantity::from_decimal_dp(ask.size, size_precision)
1665                .context("failed to parse ask size")?;
1666
1667            let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1668            deltas.push(OrderBookDelta::new(
1669                instrument_id,
1670                BookAction::Add,
1671                order,
1672                flags,
1673                0,
1674                ts_init,
1675                ts_init,
1676            ));
1677        }
1678
1679        Ok(nautilus_model::data::OrderBookDeltas::new(
1680            instrument_id,
1681            deltas,
1682        ))
1683    }
1684
1685    /// Get a cached instrument by symbol.
1686    #[must_use]
1687    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1688        self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1689    }
1690
1691    /// Get all cached instruments.
1692    #[must_use]
1693    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1694        self.instruments.iter().map(|i| i.clone()).collect()
1695    }
1696
1697    fn ensure_order_book(
1698        &self,
1699        instrument_id: InstrumentId,
1700        book_type: nautilus_model::enums::BookType,
1701    ) {
1702        self.order_books
1703            .entry(instrument_id)
1704            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1705    }
1706
1707    /// Convert a dYdX HTTP candle into a Nautilus [`Bar`].
1708    ///
1709    /// This mirrors the conversion logic used in other adapters (for example
1710    /// Hyperliquid), using the instrument price/size precision and mapping the
1711    /// candle start time to `ts_init` with `ts_event` at the end of the bar
1712    /// interval.
1713    fn candle_to_bar(
1714        candle: &crate::http::models::Candle,
1715        bar_type: BarType,
1716        price_precision: u8,
1717        size_precision: u8,
1718        bar_secs: i64,
1719        clock: &AtomicTime,
1720    ) -> anyhow::Result<Bar> {
1721        use anyhow::Context;
1722
1723        // Convert candle start time to UnixNanos (ts_init).
1724        let ts_init =
1725            datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1726
1727        // Treat ts_event as the end of the bar interval.
1728        let ts_event_ns = ts_init
1729            .as_u64()
1730            .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1731        let ts_event = UnixNanos::from(ts_event_ns);
1732
1733        let open = Price::from_decimal_dp(candle.open, price_precision)
1734            .context("failed to parse candle open price")?;
1735        let high = Price::from_decimal_dp(candle.high, price_precision)
1736            .context("failed to parse candle high price")?;
1737        let low = Price::from_decimal_dp(candle.low, price_precision)
1738            .context("failed to parse candle low price")?;
1739        let close = Price::from_decimal_dp(candle.close, price_precision)
1740            .context("failed to parse candle close price")?;
1741
1742        // Use base token volume as bar volume.
1743        let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1744            .context("failed to parse candle base_token_volume")?;
1745
1746        Ok(Bar::new(
1747            bar_type, open, high, low, close, volume, ts_event, ts_init,
1748        ))
1749    }
1750
1751    fn handle_ws_message(
1752        message: crate::websocket::messages::NautilusWsMessage,
1753        ctx: &WsMessageContext,
1754    ) {
1755        match message {
1756            crate::websocket::messages::NautilusWsMessage::Data(payloads) => {
1757                Self::handle_data_message(payloads, ctx.data_sender);
1758            }
1759            crate::websocket::messages::NautilusWsMessage::Deltas(deltas) => {
1760                Self::handle_deltas_message(
1761                    *deltas,
1762                    ctx.data_sender,
1763                    ctx.order_books,
1764                    ctx.last_quotes,
1765                    ctx.instruments,
1766                );
1767            }
1768            crate::websocket::messages::NautilusWsMessage::OraclePrices(oracle_prices) => {
1769                Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1770            }
1771            crate::websocket::messages::NautilusWsMessage::Error(err) => {
1772                tracing::error!("dYdX WS error: {err}");
1773            }
1774            crate::websocket::messages::NautilusWsMessage::Reconnected => {
1775                tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1776
1777                // Re-subscribe to all active subscriptions after WebSocket reconnection
1778                if let Some(ws) = ctx.ws_client {
1779                    let total_subs = ctx.active_orderbook_subs.len()
1780                        + ctx.active_trade_subs.len()
1781                        + ctx.active_bar_subs.len();
1782
1783                    if total_subs == 0 {
1784                        tracing::debug!("No active subscriptions to restore");
1785                        return;
1786                    }
1787
1788                    tracing::info!(
1789                        "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1790                        total_subs,
1791                        ctx.active_orderbook_subs.len(),
1792                        ctx.active_trade_subs.len(),
1793                        ctx.active_bar_subs.len()
1794                    );
1795
1796                    // Re-subscribe to orderbook channels
1797                    for entry in ctx.active_orderbook_subs.iter() {
1798                        let instrument_id = *entry.key();
1799                        let ws_clone = ws.clone();
1800                        tokio::spawn(async move {
1801                            if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1802                                tracing::error!(
1803                                    "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1804                                );
1805                            } else {
1806                                tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1807                            }
1808                        });
1809                    }
1810
1811                    // Re-subscribe to trade channels
1812                    for entry in ctx.active_trade_subs.iter() {
1813                        let instrument_id = *entry.key();
1814                        let ws_clone = ws.clone();
1815                        tokio::spawn(async move {
1816                            if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1817                                tracing::error!(
1818                                    "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1819                                );
1820                            } else {
1821                                tracing::debug!("Re-subscribed to trades for {instrument_id}");
1822                            }
1823                        });
1824                    }
1825
1826                    // Re-subscribe to candle/bar channels
1827                    for entry in ctx.active_bar_subs.iter() {
1828                        let (instrument_id, resolution) = entry.key();
1829                        let instrument_id = *instrument_id;
1830                        let resolution = resolution.clone();
1831                        let bar_type = *entry.value();
1832                        let ws_clone = ws.clone();
1833
1834                        // Re-register bar type with handler
1835                        let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
1836                        let topic = format!("{ticker}/{resolution}");
1837                        if let Err(e) = ws.send_command(
1838                            crate::websocket::handler::HandlerCommand::RegisterBarType {
1839                                topic,
1840                                bar_type,
1841                            },
1842                        ) {
1843                            tracing::warn!(
1844                                "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1845                            );
1846                        }
1847
1848                        tokio::spawn(async move {
1849                            if let Err(e) =
1850                                ws_clone.subscribe_candles(instrument_id, &resolution).await
1851                            {
1852                                tracing::error!(
1853                                    "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1854                                );
1855                            } else {
1856                                tracing::debug!(
1857                                    "Re-subscribed to candles for {instrument_id} ({resolution})"
1858                                );
1859                            }
1860                        });
1861                    }
1862
1863                    tracing::info!("Completed re-subscription requests after reconnection");
1864                } else {
1865                    tracing::warn!("WebSocket client not available for re-subscription");
1866                }
1867            }
1868            crate::websocket::messages::NautilusWsMessage::Order(_)
1869            | crate::websocket::messages::NautilusWsMessage::Fill(_)
1870            | crate::websocket::messages::NautilusWsMessage::Position(_)
1871            | crate::websocket::messages::NautilusWsMessage::AccountState(_)
1872            | crate::websocket::messages::NautilusWsMessage::SubaccountSubscribed(_)
1873            | crate::websocket::messages::NautilusWsMessage::SubaccountsChannelData(_) => {
1874                tracing::debug!(
1875                    "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1876                );
1877            }
1878        }
1879    }
1880
1881    fn handle_data_message(
1882        payloads: Vec<NautilusData>,
1883        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1884    ) {
1885        for data in payloads {
1886            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1887                tracing::error!("Failed to emit data event: {e}");
1888            }
1889        }
1890    }
1891
1892    /// Resolves a crossed order book by generating synthetic deltas to uncross it.
1893    ///
1894    /// dYdX order books can become crossed due to:
1895    /// - Validator delays in order acknowledgment across the network
1896    /// - Missed or delayed WebSocket messages from the venue
1897    ///
1898    /// This function detects when bid_price >= ask_price and iteratively removes
1899    /// the smaller side while adjusting the larger side until the book is uncrossed.
1900    ///
1901    /// # Algorithm
1902    ///
1903    /// For each crossed level:
1904    /// - If bid_size > ask_size: DELETE ask, UPDATE bid (reduce by ask_size)
1905    /// - If bid_size < ask_size: DELETE bid, UPDATE ask (reduce by bid_size)
1906    /// - If bid_size == ask_size: DELETE both bid and ask
1907    ///
1908    /// The algorithm continues until no more crosses exist or the book is empty.
1909    fn resolve_crossed_order_book(
1910        book: &mut OrderBook,
1911        venue_deltas: nautilus_model::data::OrderBookDeltas,
1912        instrument: &InstrumentAny,
1913    ) -> anyhow::Result<nautilus_model::data::OrderBookDeltas> {
1914        let instrument_id = venue_deltas.instrument_id;
1915        let ts_init = venue_deltas.ts_init;
1916        let mut all_deltas = venue_deltas.deltas.clone();
1917
1918        // Apply the original venue deltas first
1919        book.apply_deltas(&venue_deltas)?;
1920
1921        // Check if orderbook is crossed
1922        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1923            (book.best_bid_price(), book.best_ask_price())
1924        {
1925            bid_price >= ask_price
1926        } else {
1927            false
1928        };
1929
1930        // Iteratively uncross the orderbook
1931        while is_crossed {
1932            tracing::debug!(
1933                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1934                instrument_id,
1935                book.best_bid_price(),
1936                book.best_ask_price()
1937            );
1938
1939            let bid_price = match book.best_bid_price() {
1940                Some(p) => p,
1941                None => break,
1942            };
1943            let ask_price = match book.best_ask_price() {
1944                Some(p) => p,
1945                None => break,
1946            };
1947            let bid_size = match book.best_bid_size() {
1948                Some(s) => s,
1949                None => break,
1950            };
1951            let ask_size = match book.best_ask_size() {
1952                Some(s) => s,
1953                None => break,
1954            };
1955
1956            let mut temp_deltas = Vec::new();
1957
1958            if bid_size > ask_size {
1959                // Remove ask level, reduce bid level
1960                let new_bid_size = Quantity::new(
1961                    bid_size.as_f64() - ask_size.as_f64(),
1962                    instrument.size_precision(),
1963                );
1964                temp_deltas.push(OrderBookDelta::new(
1965                    instrument_id,
1966                    BookAction::Update,
1967                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1968                    0,
1969                    0,
1970                    ts_init,
1971                    ts_init,
1972                ));
1973                temp_deltas.push(OrderBookDelta::new(
1974                    instrument_id,
1975                    BookAction::Delete,
1976                    BookOrder::new(
1977                        OrderSide::Sell,
1978                        ask_price,
1979                        Quantity::new(0.0, instrument.size_precision()),
1980                        0,
1981                    ),
1982                    0,
1983                    0,
1984                    ts_init,
1985                    ts_init,
1986                ));
1987            } else if bid_size < ask_size {
1988                // Remove bid level, reduce ask level
1989                let new_ask_size = Quantity::new(
1990                    ask_size.as_f64() - bid_size.as_f64(),
1991                    instrument.size_precision(),
1992                );
1993                temp_deltas.push(OrderBookDelta::new(
1994                    instrument_id,
1995                    BookAction::Update,
1996                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1997                    0,
1998                    0,
1999                    ts_init,
2000                    ts_init,
2001                ));
2002                temp_deltas.push(OrderBookDelta::new(
2003                    instrument_id,
2004                    BookAction::Delete,
2005                    BookOrder::new(
2006                        OrderSide::Buy,
2007                        bid_price,
2008                        Quantity::new(0.0, instrument.size_precision()),
2009                        0,
2010                    ),
2011                    0,
2012                    0,
2013                    ts_init,
2014                    ts_init,
2015                ));
2016            } else {
2017                // Equal sizes: remove both levels
2018                temp_deltas.push(OrderBookDelta::new(
2019                    instrument_id,
2020                    BookAction::Delete,
2021                    BookOrder::new(
2022                        OrderSide::Buy,
2023                        bid_price,
2024                        Quantity::new(0.0, instrument.size_precision()),
2025                        0,
2026                    ),
2027                    0,
2028                    0,
2029                    ts_init,
2030                    ts_init,
2031                ));
2032                temp_deltas.push(OrderBookDelta::new(
2033                    instrument_id,
2034                    BookAction::Delete,
2035                    BookOrder::new(
2036                        OrderSide::Sell,
2037                        ask_price,
2038                        Quantity::new(0.0, instrument.size_precision()),
2039                        0,
2040                    ),
2041                    0,
2042                    0,
2043                    ts_init,
2044                    ts_init,
2045                ));
2046            }
2047
2048            // Apply temporary deltas to the book
2049            let temp_deltas_obj =
2050                nautilus_model::data::OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2051            book.apply_deltas(&temp_deltas_obj)?;
2052            all_deltas.extend(temp_deltas);
2053
2054            // Check if still crossed
2055            is_crossed = if let (Some(bid_price), Some(ask_price)) =
2056                (book.best_bid_price(), book.best_ask_price())
2057            {
2058                bid_price >= ask_price
2059            } else {
2060                false
2061            };
2062        }
2063
2064        // Set F_LAST flag on the final delta
2065        if let Some(last_delta) = all_deltas.last_mut() {
2066            last_delta.flags = RecordFlag::F_LAST as u8;
2067        }
2068
2069        Ok(nautilus_model::data::OrderBookDeltas::new(
2070            instrument_id,
2071            all_deltas,
2072        ))
2073    }
2074
2075    fn handle_deltas_message(
2076        deltas: nautilus_model::data::OrderBookDeltas,
2077        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2078        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2079        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2080        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2081    ) {
2082        use nautilus_model::enums::BookType;
2083
2084        let instrument_id = deltas.instrument_id;
2085
2086        // Get instrument for crossed orderbook resolution
2087        let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2088            Some(inst) => inst.clone(),
2089            None => {
2090                tracing::error!(
2091                    "Cannot resolve crossed order book: no instrument for {instrument_id}"
2092                );
2093                // Still emit the raw deltas even without instrument
2094                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2095                    OrderBookDeltas_API::new(deltas),
2096                ))) {
2097                    tracing::error!("Failed to emit order book deltas: {e}");
2098                }
2099                return;
2100            }
2101        };
2102
2103        // Get or create order book
2104        let mut book = order_books
2105            .entry(instrument_id)
2106            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2107
2108        // Resolve crossed orderbook (applies deltas internally)
2109        let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2110        {
2111            Ok(d) => d,
2112            Err(e) => {
2113                tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2114                return;
2115            }
2116        };
2117
2118        // Generate QuoteTick from updated top-of-book
2119        // Edge case: If orderbook is empty after deltas, fall back to last quote
2120        let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2121            (book.best_bid_price(), book.best_ask_price())
2122            && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2123        {
2124            Some(QuoteTick::new(
2125                instrument_id,
2126                bid_price,
2127                ask_price,
2128                bid_size,
2129                ask_size,
2130                resolved_deltas.ts_event,
2131                resolved_deltas.ts_init,
2132            ))
2133        } else {
2134            // Edge case: Empty orderbook levels - use last quote as fallback
2135            if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2136                tracing::debug!(
2137                    "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2138                );
2139                last_quotes.get(&instrument_id).map(|q| *q)
2140            } else {
2141                None
2142            }
2143        };
2144
2145        if let Some(quote) = quote_opt {
2146            // Only emit when top-of-book changes
2147            let emit_quote =
2148                !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2149
2150            if emit_quote {
2151                last_quotes.insert(instrument_id, quote);
2152                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2153                    tracing::error!("Failed to emit quote tick: {e}");
2154                }
2155            }
2156        } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2157            // Partial orderbook (only one side) - log but don't emit
2158            tracing::debug!(
2159                "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2160                book.best_bid_price(),
2161                book.best_ask_price()
2162            );
2163        }
2164
2165        // Emit the resolved order book deltas
2166        let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2167        if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2168            tracing::error!("Failed to emit order book deltas event: {e}");
2169        }
2170    }
2171
2172    fn handle_oracle_prices(
2173        oracle_prices: std::collections::HashMap<
2174            String,
2175            crate::websocket::types::DydxOraclePriceMarket,
2176        >,
2177        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2178        _data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2179    ) {
2180        use crate::types::DydxOraclePrice;
2181
2182        let ts_init = get_atomic_clock_realtime().get_time_ns();
2183
2184        for (symbol_str, oracle_market) in oracle_prices {
2185            let symbol = Ustr::from(&symbol_str);
2186
2187            // Get instrument to access instrument_id
2188            let Some(instrument) = instruments.get(&symbol) else {
2189                tracing::debug!(
2190                    symbol = %symbol,
2191                    "Received oracle price for unknown instrument (not cached yet)"
2192                );
2193                continue;
2194            };
2195
2196            let instrument_id = instrument.id();
2197
2198            // Parse oracle price string to Price
2199            let oracle_price_str = &oracle_market.oracle_price;
2200            let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2201                tracing::error!(
2202                    symbol = %symbol,
2203                    price_str = %oracle_price_str,
2204                    "Failed to parse oracle price as f64"
2205                );
2206                continue;
2207            };
2208
2209            let price_precision = instrument.price_precision();
2210            let oracle_price = Price::from_raw(
2211                (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2212                price_precision,
2213            );
2214
2215            let oracle_price_event = DydxOraclePrice::new(
2216                instrument_id,
2217                oracle_price,
2218                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2219                ts_init,
2220            );
2221
2222            tracing::debug!(
2223                instrument_id = %instrument_id,
2224                oracle_price = %oracle_price,
2225                "Received dYdX oracle price: {oracle_price_event:?} (not yet forwarded as Data event)"
2226            );
2227
2228            // TODO: Forward oracle price once nautilus_model::data::Data supports custom types
2229            // Similar to databento adapter handling of imbalance/statistics data
2230        }
2231    }
2232}
2233
2234////////////////////////////////////////////////////////////////////////////////
2235// Tests
2236////////////////////////////////////////////////////////////////////////////////
2237#[cfg(test)]
2238mod tests {
2239    use std::{collections::HashMap, net::SocketAddr};
2240
2241    use axum::{
2242        Router,
2243        extract::{Path, Query, State},
2244        response::Json,
2245        routing::get,
2246    };
2247    use indexmap::IndexMap;
2248    use nautilus_common::{
2249        messages::{DataEvent, data::DataResponse},
2250        runner::set_data_event_sender,
2251    };
2252    use nautilus_core::UUID4;
2253    use nautilus_model::{
2254        data::{
2255            BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2256            TradeTick, order::BookOrder,
2257        },
2258        enums::{
2259            AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2260            PriceType,
2261        },
2262        identifiers::{ClientId, InstrumentId, Symbol, Venue},
2263        instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2264        orderbook::OrderBook,
2265        types::{Price, Quantity},
2266    };
2267    use rstest::rstest;
2268    use rust_decimal::Decimal;
2269    use rust_decimal_macros::dec;
2270    use tokio::net::TcpListener;
2271
2272    use super::*;
2273    use crate::http::models::{Candle, CandlesResponse};
2274
2275    fn setup_test_env() {
2276        // Initialize data event sender for tests
2277        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2278        set_data_event_sender(sender);
2279    }
2280
2281    #[rstest]
2282    fn test_new_data_client() {
2283        setup_test_env();
2284
2285        let client_id = ClientId::from("DYDX-001");
2286        let config = DydxDataClientConfig::default();
2287        let http_client = DydxHttpClient::default();
2288
2289        let client = DydxDataClient::new(client_id, config, http_client, None);
2290        assert!(client.is_ok());
2291
2292        let client = client.unwrap();
2293        assert_eq!(client.client_id(), client_id);
2294        assert_eq!(client.venue(), *DYDX_VENUE);
2295        assert!(!client.is_connected());
2296    }
2297
2298    #[tokio::test]
2299    async fn test_data_client_lifecycle() {
2300        setup_test_env();
2301
2302        let client_id = ClientId::from("DYDX-001");
2303        let config = DydxDataClientConfig::default();
2304        let http_client = DydxHttpClient::default();
2305
2306        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2307
2308        // Test start
2309        assert!(client.start().is_ok());
2310
2311        // Test stop
2312        assert!(client.stop().is_ok());
2313        assert!(!client.is_connected());
2314
2315        // Test reset
2316        assert!(client.reset().is_ok());
2317
2318        // Test dispose
2319        assert!(client.dispose().is_ok());
2320    }
2321
2322    #[rstest]
2323    fn test_subscribe_unsubscribe_instruments_noop() {
2324        setup_test_env();
2325
2326        let client_id = ClientId::from("DYDX-TEST");
2327        let config = DydxDataClientConfig::default();
2328        let http_client = DydxHttpClient::default();
2329
2330        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2331
2332        let venue = *DYDX_VENUE;
2333        let command_id = UUID4::new();
2334        let ts_init = get_atomic_clock_realtime().get_time_ns();
2335
2336        let subscribe = SubscribeInstruments {
2337            client_id: Some(client_id),
2338            venue,
2339            command_id,
2340            ts_init,
2341            params: None,
2342        };
2343        let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2344
2345        // No-op methods should succeed even without a WebSocket client.
2346        assert!(client.subscribe_instruments(&subscribe).is_ok());
2347        assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2348    }
2349
2350    #[tokio::test]
2351    async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2352        setup_test_env();
2353
2354        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2355        let instruments = Arc::new(DashMap::new());
2356        let order_books = Arc::new(DashMap::new());
2357        let last_quotes = Arc::new(DashMap::new());
2358        let ws_client: Option<DydxWebSocketClient> = None;
2359        let active_orderbook_subs = Arc::new(DashMap::new());
2360        let active_trade_subs = Arc::new(DashMap::new());
2361        let active_bar_subs = Arc::new(DashMap::new());
2362
2363        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2364        let bar_ts = get_atomic_clock_realtime().get_time_ns();
2365
2366        // Add a test instrument to the cache (required for crossed book resolution)
2367        use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2368        let symbol = Symbol::from("BTC-USD-PERP");
2369        let instrument = CryptoPerpetual::new(
2370            instrument_id,
2371            symbol,
2372            Currency::BTC(),
2373            Currency::USD(),
2374            Currency::USD(),
2375            false,
2376            2,
2377            4,
2378            Price::from("0.01"),
2379            Quantity::from("0.0001"),
2380            None,
2381            None,
2382            None,
2383            None,
2384            None,
2385            None,
2386            None,
2387            None,
2388            None,
2389            None,
2390            None,
2391            None,
2392            bar_ts,
2393            bar_ts,
2394        );
2395        instruments.insert(
2396            Ustr::from("BTC-USD-PERP"),
2397            InstrumentAny::CryptoPerpetual(instrument),
2398        );
2399
2400        let price = Price::from("100.00");
2401        let size = Quantity::from("1.0");
2402
2403        // Create both bid and ask deltas to generate a quote
2404        let bid_delta = OrderBookDelta::new(
2405            instrument_id,
2406            BookAction::Add,
2407            nautilus_model::data::order::BookOrder::new(
2408                nautilus_model::enums::OrderSide::Buy,
2409                price,
2410                size,
2411                1,
2412            ),
2413            0,
2414            1,
2415            bar_ts,
2416            bar_ts,
2417        );
2418        let ask_delta = OrderBookDelta::new(
2419            instrument_id,
2420            BookAction::Add,
2421            nautilus_model::data::order::BookOrder::new(
2422                nautilus_model::enums::OrderSide::Sell,
2423                Price::from("101.00"),
2424                size,
2425                1,
2426            ),
2427            0,
2428            1,
2429            bar_ts,
2430            bar_ts,
2431        );
2432        let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2433
2434        let message = crate::websocket::messages::NautilusWsMessage::Deltas(Box::new(deltas));
2435
2436        let ctx = WsMessageContext {
2437            data_sender: &sender,
2438            instruments: &instruments,
2439            order_books: &order_books,
2440            last_quotes: &last_quotes,
2441            ws_client: &ws_client,
2442            active_orderbook_subs: &active_orderbook_subs,
2443            active_trade_subs: &active_trade_subs,
2444            active_bar_subs: &active_bar_subs,
2445        };
2446        DydxDataClient::handle_ws_message(message, &ctx);
2447
2448        // Ensure order book was created and top-of-book quote cached.
2449        assert!(order_books.get(&instrument_id).is_some());
2450        assert!(last_quotes.get(&instrument_id).is_some());
2451
2452        // Ensure a quote and deltas Data events were emitted.
2453        let mut saw_quote = false;
2454        let mut saw_deltas = false;
2455
2456        while let Ok(event) = rx.try_recv() {
2457            if let DataEvent::Data(data) = event {
2458                match data {
2459                    NautilusData::Quote(_) => saw_quote = true,
2460                    NautilusData::Deltas(_) => saw_deltas = true,
2461                    _ => {}
2462                }
2463            }
2464        }
2465
2466        assert!(saw_quote);
2467        assert!(saw_deltas);
2468    }
2469
2470    #[rstest]
2471    fn test_handle_ws_message_error_does_not_panic() {
2472        // Ensure malformed/error WebSocket messages are logged and ignored
2473        // without panicking or affecting client state.
2474        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2475        let instruments = Arc::new(DashMap::new());
2476        let order_books = Arc::new(DashMap::new());
2477        let last_quotes = Arc::new(DashMap::new());
2478        let ws_client: Option<DydxWebSocketClient> = None;
2479        let active_orderbook_subs = Arc::new(DashMap::new());
2480        let active_trade_subs = Arc::new(DashMap::new());
2481        let active_bar_subs = Arc::new(DashMap::new());
2482
2483        let ctx = WsMessageContext {
2484            data_sender: &sender,
2485            instruments: &instruments,
2486            order_books: &order_books,
2487            last_quotes: &last_quotes,
2488            ws_client: &ws_client,
2489            active_orderbook_subs: &active_orderbook_subs,
2490            active_trade_subs: &active_trade_subs,
2491            active_bar_subs: &active_bar_subs,
2492        };
2493
2494        let err = crate::websocket::error::DydxWebSocketError::from_message(
2495            "malformed WebSocket payload".to_string(),
2496        );
2497
2498        DydxDataClient::handle_ws_message(
2499            crate::websocket::messages::NautilusWsMessage::Error(err),
2500            &ctx,
2501        );
2502    }
2503
2504    #[tokio::test]
2505    async fn test_request_bars_partitioning_math_does_not_panic() {
2506        setup_test_env();
2507
2508        let client_id = ClientId::from("DYDX-BARS");
2509        let config = DydxDataClientConfig::default();
2510        let http_client = DydxHttpClient::default();
2511
2512        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2513
2514        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2515        let spec = BarSpecification {
2516            step: std::num::NonZeroUsize::new(1).unwrap(),
2517            aggregation: BarAggregation::Minute,
2518            price_type: PriceType::Last,
2519        };
2520        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2521
2522        let now = chrono::Utc::now();
2523        let start = Some(now - chrono::Duration::hours(10));
2524        let end = Some(now);
2525
2526        let request = RequestBars::new(
2527            bar_type,
2528            start,
2529            end,
2530            None,
2531            Some(client_id),
2532            UUID4::new(),
2533            get_atomic_clock_realtime().get_time_ns(),
2534            None,
2535        );
2536
2537        // We only verify that the partitioning logic executes without panicking;
2538        // HTTP calls are allowed to fail and are handled internally.
2539        assert!(client.request_bars(&request).is_ok());
2540    }
2541
2542    #[tokio::test]
2543    async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2544        setup_test_env();
2545
2546        // Prepare a simple candles response served by a local Axum HTTP server.
2547        let now = chrono::Utc::now();
2548        let candle = crate::http::models::Candle {
2549            started_at: now - chrono::Duration::minutes(1),
2550            ticker: "BTC-USD".to_string(),
2551            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2552            open: dec!(100.0),
2553            high: dec!(101.0),
2554            low: dec!(99.0),
2555            close: dec!(100.5),
2556            base_token_volume: dec!(1.0),
2557            usd_volume: dec!(100.0),
2558            trades: 10,
2559            starting_open_interest: dec!(1000.0),
2560        };
2561        let candles_response = crate::http::models::CandlesResponse {
2562            candles: vec![candle],
2563        };
2564        let state = CandlesTestState {
2565            response: Arc::new(candles_response),
2566        };
2567        let addr = start_candles_test_server(state).await;
2568        let base_url = format!("http://{}", addr);
2569
2570        let client_id = ClientId::from("DYDX-BARS-MONTHS");
2571        let config = DydxDataClientConfig {
2572            base_url_http: Some(base_url),
2573            is_testnet: true,
2574            ..Default::default()
2575        };
2576
2577        let http_client = DydxHttpClient::new(
2578            config.base_url_http.clone(),
2579            config.http_timeout_secs,
2580            config.http_proxy_url.clone(),
2581            config.is_testnet,
2582            None,
2583        )
2584        .unwrap();
2585
2586        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2587
2588        // Seed instrument cache so request_bars can resolve precision.
2589        let instrument = create_test_instrument_any();
2590        let instrument_id = instrument.id();
2591        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2592        client.instruments.insert(symbol_key, instrument);
2593
2594        let spec = BarSpecification {
2595            step: std::num::NonZeroUsize::new(1).unwrap(),
2596            aggregation: BarAggregation::Minute,
2597            price_type: PriceType::Last,
2598        };
2599        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2600
2601        // Use a date range spanning multiple months to exercise partitioning math.
2602        let start = Some(now - chrono::Duration::days(90));
2603        let end = Some(now);
2604
2605        // Limit the total number of bars so the test completes quickly.
2606        let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2607
2608        let request = RequestBars::new(
2609            bar_type,
2610            start,
2611            end,
2612            limit,
2613            Some(client_id),
2614            UUID4::new(),
2615            get_atomic_clock_realtime().get_time_ns(),
2616            None,
2617        );
2618
2619        assert!(client.request_bars(&request).is_ok());
2620    }
2621
2622    #[derive(Clone)]
2623    struct OrderbookTestState {
2624        snapshot: Arc<crate::http::models::OrderbookResponse>,
2625    }
2626
2627    #[derive(Clone)]
2628    struct TradesTestState {
2629        response: Arc<crate::http::models::TradesResponse>,
2630        last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2631        last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2632    }
2633
2634    #[derive(Clone)]
2635    struct CandlesTestState {
2636        response: Arc<crate::http::models::CandlesResponse>,
2637    }
2638
2639    async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2640        async fn handle_orderbook(
2641            Path(_ticker): Path<String>,
2642            State(state): State<OrderbookTestState>,
2643        ) -> Json<crate::http::models::OrderbookResponse> {
2644            Json((*state.snapshot).clone())
2645        }
2646
2647        let router = Router::new().route(
2648            "/v4/orderbooks/perpetualMarket/{ticker}",
2649            get(handle_orderbook).with_state(state),
2650        );
2651
2652        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2653        let addr = listener.local_addr().unwrap();
2654
2655        tokio::spawn(async move {
2656            axum::serve(listener, router.into_make_service())
2657                .await
2658                .unwrap();
2659        });
2660
2661        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2662        addr
2663    }
2664
2665    async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2666        async fn handle_trades(
2667            Path(ticker): Path<String>,
2668            Query(params): Query<HashMap<String, String>>,
2669            State(state): State<TradesTestState>,
2670        ) -> Json<crate::http::models::TradesResponse> {
2671            {
2672                let mut last_ticker = state.last_ticker.lock().await;
2673                *last_ticker = Some(ticker);
2674            }
2675
2676            let limit = params
2677                .get("limit")
2678                .and_then(|value| value.parse::<u32>().ok());
2679            {
2680                let mut last_limit = state.last_limit.lock().await;
2681                *last_limit = Some(limit);
2682            }
2683
2684            Json((*state.response).clone())
2685        }
2686
2687        let router = Router::new().route(
2688            "/v4/trades/perpetualMarket/{ticker}",
2689            get(handle_trades).with_state(state),
2690        );
2691
2692        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2693        let addr = listener.local_addr().unwrap();
2694
2695        tokio::spawn(async move {
2696            axum::serve(listener, router.into_make_service())
2697                .await
2698                .unwrap();
2699        });
2700
2701        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2702        addr
2703    }
2704
2705    async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2706        async fn handle_candles(
2707            Path(_ticker): Path<String>,
2708            Query(_params): Query<HashMap<String, String>>,
2709            State(state): State<CandlesTestState>,
2710        ) -> Json<crate::http::models::CandlesResponse> {
2711            Json((*state.response).clone())
2712        }
2713
2714        let router = Router::new().route(
2715            "/v4/candles/perpetualMarkets/{ticker}",
2716            get(handle_candles).with_state(state),
2717        );
2718
2719        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2720        let addr = listener.local_addr().unwrap();
2721
2722        tokio::spawn(async move {
2723            axum::serve(listener, router.into_make_service())
2724                .await
2725                .unwrap();
2726        });
2727
2728        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729        addr
2730    }
2731
2732    fn create_test_instrument_any() -> InstrumentAny {
2733        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2734
2735        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2736            instrument_id,
2737            instrument_id.symbol,
2738            nautilus_model::types::currency::Currency::BTC(),
2739            nautilus_model::types::currency::Currency::USD(),
2740            nautilus_model::types::currency::Currency::USD(),
2741            false,
2742            2,                                // price_precision
2743            8,                                // size_precision
2744            Price::new(0.01, 2),              // price_increment
2745            Quantity::new(0.001, 8),          // size_increment
2746            Some(Quantity::new(1.0, 0)),      // multiplier
2747            Some(Quantity::new(0.001, 8)),    // lot_size
2748            Some(Quantity::new(100000.0, 8)), // max_quantity
2749            Some(Quantity::new(0.001, 8)),    // min_quantity
2750            None,                             // max_notional
2751            None,                             // min_notional
2752            Some(Price::new(1000000.0, 2)),   // max_price
2753            Some(Price::new(0.01, 2)),        // min_price
2754            Some(dec!(0.05)),                 // margin_init
2755            Some(dec!(0.03)),                 // margin_maint
2756            Some(dec!(0.0002)),               // maker_fee
2757            Some(dec!(0.0005)),               // taker_fee
2758            UnixNanos::default(),             // ts_event
2759            UnixNanos::default(),             // ts_init
2760        ))
2761    }
2762
2763    // ------------------------------------------------------------------------
2764    // Precision & bar conversion tests
2765    // ------------------------------------------------------------------------
2766
2767    #[tokio::test]
2768    async fn test_candle_to_bar_price_size_edge_cases() {
2769        setup_test_env();
2770
2771        let clock = get_atomic_clock_realtime();
2772        let now = chrono::Utc::now();
2773
2774        // Very large prices and sizes (edge cases).
2775        let candle = Candle {
2776            started_at: now,
2777            ticker: "BTC-USD".to_string(),
2778            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2779            open: dec!(123456789.123456),
2780            high: dec!(987654321.987654),  // high is max
2781            low: dec!(123456.789),         // low is min
2782            close: dec!(223456789.123456), // close between low and high
2783            base_token_volume: dec!(0.00000001),
2784            usd_volume: dec!(1234500.0),
2785            trades: 42,
2786            starting_open_interest: dec!(1000.0),
2787        };
2788
2789        let instrument = create_test_instrument_any();
2790        let instrument_id = instrument.id();
2791        let spec = BarSpecification {
2792            step: std::num::NonZeroUsize::new(1).unwrap(),
2793            aggregation: BarAggregation::Minute,
2794            price_type: PriceType::Last,
2795        };
2796        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2797
2798        let bar = DydxDataClient::candle_to_bar(
2799            &candle,
2800            bar_type,
2801            instrument.price_precision(),
2802            instrument.size_precision(),
2803            60,
2804            clock,
2805        )
2806        .expect("candle_to_bar should handle large/scientific values");
2807
2808        assert!(bar.open.as_f64() > 0.0);
2809        assert!(bar.high.as_f64() >= bar.low.as_f64());
2810        assert!(bar.volume.as_f64() > 0.0);
2811    }
2812
2813    #[tokio::test]
2814    async fn test_candle_to_bar_ts_event_overflow_safe() {
2815        setup_test_env();
2816
2817        let clock = get_atomic_clock_realtime();
2818        let now = chrono::Utc::now();
2819
2820        let candle = Candle {
2821            started_at: now,
2822            ticker: "BTC-USD".to_string(),
2823            resolution: crate::common::enums::DydxCandleResolution::OneDay,
2824            open: Decimal::from(1),
2825            high: Decimal::from(1),
2826            low: Decimal::from(1),
2827            close: Decimal::from(1),
2828            base_token_volume: Decimal::from(1),
2829            usd_volume: Decimal::from(1),
2830            trades: 1,
2831            starting_open_interest: Decimal::from(1),
2832        };
2833
2834        let instrument = create_test_instrument_any();
2835        let instrument_id = instrument.id();
2836        let spec = BarSpecification {
2837            step: std::num::NonZeroUsize::new(1).unwrap(),
2838            aggregation: BarAggregation::Day,
2839            price_type: PriceType::Last,
2840        };
2841        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2842
2843        // Use an intentionally large bar_secs to exercise saturating_add path.
2844        let bar_secs = i64::MAX / 1_000_000_000;
2845        let bar = DydxDataClient::candle_to_bar(
2846            &candle,
2847            bar_type,
2848            instrument.price_precision(),
2849            instrument.size_precision(),
2850            bar_secs,
2851            clock,
2852        )
2853        .expect("candle_to_bar should not overflow on ts_event");
2854
2855        assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2856    }
2857
2858    #[tokio::test]
2859    async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2860        // Simulate bars with ts_event both before and after current_time_ns and
2861        // ensure only completed bars (ts_event < now) are retained.
2862        let clock = get_atomic_clock_realtime();
2863        let now = chrono::Utc::now();
2864
2865        // Use a dedicated data channel for this test and register it
2866        // before constructing the data client.
2867        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2868        set_data_event_sender(sender);
2869
2870        // two candles: one in the past, one in the future
2871        let candle_past = Candle {
2872            started_at: now - chrono::Duration::minutes(2),
2873            ticker: "BTC-USD".to_string(),
2874            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2875            open: Decimal::from(1),
2876            high: Decimal::from(2),
2877            low: Decimal::from(1),
2878            close: Decimal::from(1),
2879            base_token_volume: Decimal::from(1),
2880            usd_volume: Decimal::from(1),
2881            trades: 1,
2882            starting_open_interest: Decimal::from(1),
2883        };
2884        let candle_future = Candle {
2885            started_at: now + chrono::Duration::minutes(2),
2886            ..candle_past.clone()
2887        };
2888
2889        let candles_response = CandlesResponse {
2890            candles: vec![candle_past, candle_future],
2891        };
2892
2893        let state = CandlesTestState {
2894            response: Arc::new(candles_response),
2895        };
2896        let addr = start_candles_test_server(state).await;
2897        let base_url = format!("http://{}", addr);
2898
2899        let client_id = ClientId::from("DYDX-BARS-SKEW");
2900        let config = DydxDataClientConfig {
2901            base_url_http: Some(base_url),
2902            is_testnet: true,
2903            ..Default::default()
2904        };
2905
2906        let http_client = DydxHttpClient::new(
2907            config.base_url_http.clone(),
2908            config.http_timeout_secs,
2909            config.http_proxy_url.clone(),
2910            config.is_testnet,
2911            None,
2912        )
2913        .unwrap();
2914
2915        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2916
2917        let instrument = create_test_instrument_any();
2918        let instrument_id = instrument.id();
2919        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
2920        client.instruments.insert(symbol_key, instrument);
2921
2922        let spec = BarSpecification {
2923            step: std::num::NonZeroUsize::new(1).unwrap(),
2924            aggregation: BarAggregation::Minute,
2925            price_type: PriceType::Last,
2926        };
2927        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2928
2929        let request = RequestBars::new(
2930            bar_type,
2931            Some(now - chrono::Duration::minutes(5)),
2932            Some(now + chrono::Duration::minutes(5)),
2933            None,
2934            Some(client_id),
2935            UUID4::new(),
2936            clock.get_time_ns(),
2937            None,
2938        );
2939
2940        assert!(client.request_bars(&request).is_ok());
2941
2942        let timeout = tokio::time::Duration::from_secs(3);
2943        if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
2944            tokio::time::timeout(timeout, rx.recv()).await
2945        {
2946            // Only the past candle should remain after filtering.
2947            assert_eq!(resp.data.len(), 1);
2948        }
2949    }
2950
2951    #[rstest]
2952    fn test_decimal_to_f64_precision_loss_within_tolerance() {
2953        // Verify converting via Price/Quantity preserves reasonable precision.
2954        let price_value = 12345.125_f64;
2955        let qty_value = 0.00012345_f64;
2956
2957        let price = Price::new(price_value, 6);
2958        let qty = Quantity::new(qty_value, 8);
2959
2960        let price_diff = (price.as_f64() - price_value).abs();
2961        let qty_diff = (qty.as_f64() - qty_value).abs();
2962
2963        // Differences should be well within a tiny epsilon.
2964        assert!(price_diff < 1e-10);
2965        assert!(qty_diff < 1e-12);
2966    }
2967
2968    #[tokio::test]
2969    async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
2970        // Set up a dedicated data event channel for this test.
2971        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2972        set_data_event_sender(sender);
2973
2974        // Prepare a static orderbook snapshot served by a local Axum HTTP server.
2975        let snapshot = crate::http::models::OrderbookResponse {
2976            bids: vec![crate::http::models::OrderbookLevel {
2977                price: dec!(100.0),
2978                size: dec!(1.0),
2979            }],
2980            asks: vec![crate::http::models::OrderbookLevel {
2981                price: dec!(101.0),
2982                size: dec!(2.0),
2983            }],
2984        };
2985        let state = OrderbookTestState {
2986            snapshot: Arc::new(snapshot),
2987        };
2988        let addr = start_orderbook_test_server(state).await;
2989        let base_url = format!("http://{}", addr);
2990
2991        // Configure the data client with a short refresh interval and mock HTTP base URL.
2992        let client_id = ClientId::from("DYDX-REFRESH");
2993        let config = DydxDataClientConfig {
2994            is_testnet: true,
2995            base_url_http: Some(base_url),
2996            orderbook_refresh_interval_secs: Some(1),
2997            instrument_refresh_interval_secs: None,
2998            ..Default::default()
2999        };
3000
3001        let http_client = DydxHttpClient::new(
3002            config.base_url_http.clone(),
3003            config.http_timeout_secs,
3004            config.http_proxy_url.clone(),
3005            config.is_testnet,
3006            None,
3007        )
3008        .unwrap();
3009
3010        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3011
3012        // Seed instruments and orderbook state for a single instrument.
3013        let instrument = create_test_instrument_any();
3014        let instrument_id = instrument.id();
3015        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3016        client.instruments.insert(symbol_key, instrument);
3017        client.order_books.insert(
3018            instrument_id,
3019            OrderBook::new(instrument_id, BookType::L2_MBP),
3020        );
3021        client.active_orderbook_subs.insert(instrument_id, ());
3022
3023        // Start the refresh task and wait for a snapshot to be applied and emitted.
3024        client.start_orderbook_refresh_task().unwrap();
3025
3026        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3027        let mut saw_snapshot_event = false;
3028
3029        while std::time::Instant::now() < deadline {
3030            if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3031                tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3032            {
3033                saw_snapshot_event = true;
3034                break;
3035            }
3036        }
3037
3038        assert!(
3039            saw_snapshot_event,
3040            "expected at least one snapshot deltas event from refresh task"
3041        );
3042
3043        // Verify that the local orderbook has been updated with the snapshot.
3044        let book = client
3045            .order_books
3046            .get(&instrument_id)
3047            .expect("orderbook should exist after refresh");
3048        let best_bid = book.best_bid_price().expect("best bid should be set");
3049        let best_ask = book.best_ask_price().expect("best ask should be set");
3050
3051        assert_eq!(best_bid, Price::from("100.00"));
3052        assert_eq!(best_ask, Price::from("101.00"));
3053    }
3054
3055    #[rstest]
3056    fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3057        // Test scenario: bid_size > ask_size
3058        // Expected: DELETE ask, UPDATE bid (reduce by ask_size)
3059        let instrument = create_test_instrument_any();
3060        let instrument_id = instrument.id();
3061        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3062        let ts_init = get_atomic_clock_realtime().get_time_ns();
3063
3064        // Create initial non-crossed book
3065        let initial_deltas = vec![
3066            OrderBookDelta::new(
3067                instrument_id,
3068                BookAction::Add,
3069                BookOrder::new(
3070                    OrderSide::Buy,
3071                    Price::from("99.00"),
3072                    Quantity::from("1.0"),
3073                    0,
3074                ),
3075                0,
3076                0,
3077                ts_init,
3078                ts_init,
3079            ),
3080            OrderBookDelta::new(
3081                instrument_id,
3082                BookAction::Add,
3083                BookOrder::new(
3084                    OrderSide::Sell,
3085                    Price::from("101.00"),
3086                    Quantity::from("2.0"),
3087                    0,
3088                ),
3089                0,
3090                0,
3091                ts_init,
3092                ts_init,
3093            ),
3094        ];
3095        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3096            .unwrap();
3097
3098        // Create crossed orderbook: bid @ 102.00 (size 5.0) > ask @ 101.00 (size 2.0)
3099        let crossed_deltas = vec![OrderBookDelta::new(
3100            instrument_id,
3101            BookAction::Add,
3102            BookOrder::new(
3103                OrderSide::Buy,
3104                Price::from("102.00"),
3105                Quantity::from("5.0"),
3106                0,
3107            ),
3108            0,
3109            0,
3110            ts_init,
3111            ts_init,
3112        )];
3113        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3114
3115        let resolved =
3116            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3117                .unwrap();
3118
3119        // Verify resolution: ask @ 101.00 should be deleted
3120        // bid @ 102.00 should remain but reduced (note: precision affects exact value)
3121        assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3122        assert!(book.best_bid_size().unwrap().as_f64() < 5.0); // Reduced from original
3123        assert!(
3124            book.best_ask_price().is_none()
3125                || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3126        ); // No longer crossed
3127
3128        // Verify synthetic deltas were generated
3129        assert!(resolved.deltas.len() > 1); // Original delta + synthetic resolution deltas
3130    }
3131
3132    #[rstest]
3133    fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3134        // Test scenario: bid_size < ask_size
3135        // Expected: DELETE bid, UPDATE ask (reduce by bid_size)
3136        let instrument = create_test_instrument_any();
3137        let instrument_id = instrument.id();
3138        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3139        let ts_init = get_atomic_clock_realtime().get_time_ns();
3140
3141        // Create initial non-crossed book
3142        let initial_deltas = vec![
3143            OrderBookDelta::new(
3144                instrument_id,
3145                BookAction::Add,
3146                BookOrder::new(
3147                    OrderSide::Buy,
3148                    Price::from("99.00"),
3149                    Quantity::from("1.0"),
3150                    0,
3151                ),
3152                0,
3153                0,
3154                ts_init,
3155                ts_init,
3156            ),
3157            OrderBookDelta::new(
3158                instrument_id,
3159                BookAction::Add,
3160                BookOrder::new(
3161                    OrderSide::Sell,
3162                    Price::from("101.00"),
3163                    Quantity::from("5.0"),
3164                    0,
3165                ),
3166                0,
3167                0,
3168                ts_init,
3169                ts_init,
3170            ),
3171        ];
3172        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3173            .unwrap();
3174
3175        // Create crossed orderbook: bid @ 102.00 (size 2.0) < ask @ 101.00 (size 5.0)
3176        let crossed_deltas = vec![OrderBookDelta::new(
3177            instrument_id,
3178            BookAction::Add,
3179            BookOrder::new(
3180                OrderSide::Buy,
3181                Price::from("102.00"),
3182                Quantity::from("2.0"),
3183                0,
3184            ),
3185            0,
3186            0,
3187            ts_init,
3188            ts_init,
3189        )];
3190        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3191
3192        let resolved =
3193            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3194                .unwrap();
3195
3196        // Verify resolution: bid @ 102.00 should be deleted, ask @ 101.00 reduced
3197        assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3198        assert!(book.best_ask_size().unwrap().as_f64() < 5.0); // Reduced from original
3199        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level remains
3200        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); // No longer crossed
3201
3202        // Verify synthetic deltas were generated
3203        assert!(resolved.deltas.len() > 1);
3204    }
3205
3206    #[rstest]
3207    fn test_resolve_crossed_order_book_equal_sizes() {
3208        // Test scenario: bid_size == ask_size
3209        // Expected: DELETE both bid and ask
3210        let instrument = create_test_instrument_any();
3211        let instrument_id = instrument.id();
3212        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3213        let ts_init = get_atomic_clock_realtime().get_time_ns();
3214
3215        // Create initial non-crossed book with multiple levels
3216        let initial_deltas = vec![
3217            OrderBookDelta::new(
3218                instrument_id,
3219                BookAction::Add,
3220                BookOrder::new(
3221                    OrderSide::Buy,
3222                    Price::from("99.00"),
3223                    Quantity::from("1.0"),
3224                    0,
3225                ),
3226                0,
3227                0,
3228                ts_init,
3229                ts_init,
3230            ),
3231            OrderBookDelta::new(
3232                instrument_id,
3233                BookAction::Add,
3234                BookOrder::new(
3235                    OrderSide::Sell,
3236                    Price::from("101.00"),
3237                    Quantity::from("3.0"),
3238                    0,
3239                ),
3240                0,
3241                0,
3242                ts_init,
3243                ts_init,
3244            ),
3245        ];
3246        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3247            .unwrap();
3248
3249        // Create crossed orderbook: bid @ 102.00 (size 3.0) == ask @ 101.00 (size 3.0)
3250        let crossed_deltas = vec![OrderBookDelta::new(
3251            instrument_id,
3252            BookAction::Add,
3253            BookOrder::new(
3254                OrderSide::Buy,
3255                Price::from("102.00"),
3256                Quantity::from("3.0"),
3257                0,
3258            ),
3259            0,
3260            0,
3261            ts_init,
3262            ts_init,
3263        )];
3264        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3265
3266        let resolved =
3267            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3268                .unwrap();
3269
3270        // Verify resolution: both crossed levels should be deleted, reverting to deeper levels
3271        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level
3272        // Ask at 101.00 should be deleted, book may be empty on ask side or have deeper levels
3273        if let Some(ask_price) = book.best_ask_price() {
3274            assert!(ask_price > book.best_bid_price().unwrap()); // No longer crossed
3275        }
3276
3277        // Verify synthetic deltas were generated
3278        assert!(resolved.deltas.len() > 1);
3279    }
3280
3281    #[rstest]
3282    fn test_resolve_crossed_order_book_multiple_iterations() {
3283        // Test scenario: multiple crossed levels requiring multiple iterations
3284        let instrument = create_test_instrument_any();
3285        let instrument_id = instrument.id();
3286        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3287        let ts_init = get_atomic_clock_realtime().get_time_ns();
3288
3289        // Create initial book with multiple levels on both sides
3290        let initial_deltas = vec![
3291            OrderBookDelta::new(
3292                instrument_id,
3293                BookAction::Add,
3294                BookOrder::new(
3295                    OrderSide::Buy,
3296                    Price::from("98.00"),
3297                    Quantity::from("1.0"),
3298                    0,
3299                ),
3300                0,
3301                0,
3302                ts_init,
3303                ts_init,
3304            ),
3305            OrderBookDelta::new(
3306                instrument_id,
3307                BookAction::Add,
3308                BookOrder::new(
3309                    OrderSide::Sell,
3310                    Price::from("100.00"),
3311                    Quantity::from("1.0"),
3312                    0,
3313                ),
3314                0,
3315                0,
3316                ts_init,
3317                ts_init,
3318            ),
3319            OrderBookDelta::new(
3320                instrument_id,
3321                BookAction::Add,
3322                BookOrder::new(
3323                    OrderSide::Sell,
3324                    Price::from("101.00"),
3325                    Quantity::from("1.0"),
3326                    0,
3327                ),
3328                0,
3329                0,
3330                ts_init,
3331                ts_init,
3332            ),
3333        ];
3334        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3335            .unwrap();
3336
3337        // Create heavily crossed orderbook with multiple bids above asks
3338        let crossed_deltas = vec![
3339            OrderBookDelta::new(
3340                instrument_id,
3341                BookAction::Add,
3342                BookOrder::new(
3343                    OrderSide::Buy,
3344                    Price::from("102.00"),
3345                    Quantity::from("1.0"),
3346                    0,
3347                ),
3348                0,
3349                0,
3350                ts_init,
3351                ts_init,
3352            ),
3353            OrderBookDelta::new(
3354                instrument_id,
3355                BookAction::Add,
3356                BookOrder::new(
3357                    OrderSide::Buy,
3358                    Price::from("103.00"),
3359                    Quantity::from("1.0"),
3360                    0,
3361                ),
3362                0,
3363                0,
3364                ts_init,
3365                ts_init,
3366            ),
3367        ];
3368        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3369
3370        let resolved =
3371            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3372                .unwrap();
3373
3374        // Verify final state is uncrossed (or book has no asks left)
3375        if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3376            assert!(ask_price > bid_price, "Book should be uncrossed");
3377        }
3378
3379        // Verify multiple synthetic deltas were generated for multiple iterations
3380        assert!(resolved.deltas.len() > 2); // Original deltas + multiple resolution passes
3381    }
3382
3383    #[rstest]
3384    fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3385        // Test scenario: non-crossed orderbook should pass through unchanged
3386        let instrument = create_test_instrument_any();
3387        let instrument_id = instrument.id();
3388        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3389        let ts_init = get_atomic_clock_realtime().get_time_ns();
3390
3391        // Create normal non-crossed book
3392        let initial_deltas = vec![
3393            OrderBookDelta::new(
3394                instrument_id,
3395                BookAction::Add,
3396                BookOrder::new(
3397                    OrderSide::Buy,
3398                    Price::from("99.00"),
3399                    Quantity::from("1.0"),
3400                    0,
3401                ),
3402                0,
3403                0,
3404                ts_init,
3405                ts_init,
3406            ),
3407            OrderBookDelta::new(
3408                instrument_id,
3409                BookAction::Add,
3410                BookOrder::new(
3411                    OrderSide::Sell,
3412                    Price::from("101.00"),
3413                    Quantity::from("1.0"),
3414                    0,
3415                ),
3416                0,
3417                0,
3418                ts_init,
3419                ts_init,
3420            ),
3421        ];
3422        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3423            .unwrap();
3424
3425        // Add another non-crossing level
3426        let new_deltas = vec![OrderBookDelta::new(
3427            instrument_id,
3428            BookAction::Add,
3429            BookOrder::new(
3430                OrderSide::Buy,
3431                Price::from("98.50"),
3432                Quantity::from("2.0"),
3433                0,
3434            ),
3435            0,
3436            0,
3437            ts_init,
3438            ts_init,
3439        )];
3440        let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3441
3442        let original_bid = book.best_bid_price();
3443        let original_ask = book.best_ask_price();
3444
3445        let resolved =
3446            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3447                .unwrap();
3448
3449        // Verify no resolution was needed - deltas should be original only
3450        assert_eq!(resolved.deltas.len(), new_deltas.len());
3451        assert_eq!(book.best_bid_price(), original_bid);
3452        assert_eq!(book.best_ask_price(), original_ask);
3453        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3454    }
3455
3456    // ========================================================================
3457    // request_instruments Tests
3458    // ========================================================================
3459
3460    #[tokio::test]
3461    async fn test_request_instruments_successful_fetch() {
3462        // Test successful fetch of all instruments
3463        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3464        set_data_event_sender(sender);
3465
3466        let client_id = ClientId::from("DYDX-TEST");
3467        let config = DydxDataClientConfig::default();
3468        let http_client = DydxHttpClient::default();
3469        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3470
3471        let request = RequestInstruments::new(
3472            None,
3473            None,
3474            Some(client_id),
3475            Some(*DYDX_VENUE),
3476            UUID4::new(),
3477            get_atomic_clock_realtime().get_time_ns(),
3478            None,
3479        );
3480
3481        // Execute request (spawns async task)
3482        assert!(client.request_instruments(&request).is_ok());
3483
3484        // Wait for response (with timeout)
3485        let timeout = tokio::time::Duration::from_secs(5);
3486        let result = tokio::time::timeout(timeout, rx.recv()).await;
3487
3488        match result {
3489            Ok(Some(DataEvent::Response(resp))) => {
3490                if let DataResponse::Instruments(inst_resp) = resp {
3491                    // Verify response structure
3492                    assert_eq!(inst_resp.correlation_id, request.request_id);
3493                    assert_eq!(inst_resp.client_id, client_id);
3494                    assert_eq!(inst_resp.venue, *DYDX_VENUE);
3495                    assert!(inst_resp.start.is_none());
3496                    assert!(inst_resp.end.is_none());
3497                    // Note: may be empty if HTTP fails, but structure should be correct
3498                }
3499            }
3500            Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3501            Ok(None) => panic!("Channel closed unexpectedly"),
3502            Err(_) => {
3503                // Timeout is acceptable if testnet is unreachable
3504                println!("Test timed out - testnet may be unreachable");
3505            }
3506        }
3507    }
3508
3509    #[tokio::test]
3510    async fn test_request_instruments_empty_response_on_http_error() {
3511        // Test empty response handling when HTTP call fails
3512        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3513        set_data_event_sender(sender);
3514
3515        let client_id = ClientId::from("DYDX-ERROR-TEST");
3516        let config = DydxDataClientConfig {
3517            base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3518            ..Default::default()
3519        };
3520        let http_client = DydxHttpClient::new(
3521            config.base_url_http.clone(),
3522            config.http_timeout_secs,
3523            config.http_proxy_url.clone(),
3524            config.is_testnet,
3525            None,
3526        )
3527        .unwrap();
3528
3529        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3530
3531        let request = RequestInstruments::new(
3532            None,
3533            None,
3534            Some(client_id),
3535            Some(*DYDX_VENUE),
3536            UUID4::new(),
3537            get_atomic_clock_realtime().get_time_ns(),
3538            None,
3539        );
3540
3541        assert!(client.request_instruments(&request).is_ok());
3542
3543        // Should receive empty response on error
3544        let timeout = tokio::time::Duration::from_secs(3);
3545        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3546            tokio::time::timeout(timeout, rx.recv()).await
3547        {
3548            assert!(
3549                resp.data.is_empty(),
3550                "Expected empty instruments on HTTP error"
3551            );
3552            assert_eq!(resp.correlation_id, request.request_id);
3553            assert_eq!(resp.client_id, client_id);
3554        }
3555    }
3556
3557    #[tokio::test]
3558    async fn test_request_instruments_caching() {
3559        // Test instrument caching after fetch
3560        setup_test_env();
3561
3562        let client_id = ClientId::from("DYDX-CACHE-TEST");
3563        let config = DydxDataClientConfig::default();
3564        let http_client = DydxHttpClient::default();
3565        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3566
3567        let initial_cache_size = client.instruments.len();
3568
3569        let request = RequestInstruments::new(
3570            None,
3571            None,
3572            Some(client_id),
3573            Some(*DYDX_VENUE),
3574            UUID4::new(),
3575            get_atomic_clock_realtime().get_time_ns(),
3576            None,
3577        );
3578
3579        assert!(client.request_instruments(&request).is_ok());
3580
3581        // Wait for async task to complete
3582        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3583
3584        // Verify cache populated (if HTTP succeeded)
3585        let final_cache_size = client.instruments.len();
3586        // Cache should be unchanged (empty) if HTTP failed, or populated if succeeded
3587        // We can't assert exact size without mocking, but can verify no panic
3588        assert!(final_cache_size >= initial_cache_size);
3589    }
3590
3591    #[tokio::test]
3592    async fn test_request_instruments_correlation_id_matching() {
3593        // Test correlation_id matching in response
3594        setup_test_env();
3595
3596        let client_id = ClientId::from("DYDX-CORR-TEST");
3597        let config = DydxDataClientConfig::default();
3598        let http_client = DydxHttpClient::default();
3599        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3600
3601        let request_id = UUID4::new();
3602        let request = RequestInstruments::new(
3603            None,
3604            None,
3605            Some(client_id),
3606            Some(*DYDX_VENUE),
3607            request_id,
3608            get_atomic_clock_realtime().get_time_ns(),
3609            None,
3610        );
3611
3612        // Should execute without panic (actual correlation checked in async handler)
3613        assert!(client.request_instruments(&request).is_ok());
3614    }
3615
3616    #[tokio::test]
3617    async fn test_request_instruments_venue_assignment() {
3618        // Test venue assignment
3619        setup_test_env();
3620
3621        let client_id = ClientId::from("DYDX-VENUE-TEST");
3622        let config = DydxDataClientConfig::default();
3623        let http_client = DydxHttpClient::default();
3624        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3625
3626        assert_eq!(client.venue(), *DYDX_VENUE);
3627
3628        let request = RequestInstruments::new(
3629            None,
3630            None,
3631            Some(client_id),
3632            Some(*DYDX_VENUE),
3633            UUID4::new(),
3634            get_atomic_clock_realtime().get_time_ns(),
3635            None,
3636        );
3637
3638        assert!(client.request_instruments(&request).is_ok());
3639    }
3640
3641    #[tokio::test]
3642    async fn test_request_instruments_timestamp_handling() {
3643        // Test timestamp handling (start_nanos, end_nanos)
3644        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3645        set_data_event_sender(sender);
3646
3647        let client_id = ClientId::from("DYDX-TS-TEST");
3648        let config = DydxDataClientConfig::default();
3649        let http_client = DydxHttpClient::default();
3650        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3651
3652        let now = chrono::Utc::now();
3653        let start = Some(now - chrono::Duration::hours(24));
3654        let end = Some(now);
3655
3656        let request = RequestInstruments::new(
3657            start,
3658            end,
3659            Some(client_id),
3660            Some(*DYDX_VENUE),
3661            UUID4::new(),
3662            get_atomic_clock_realtime().get_time_ns(),
3663            None,
3664        );
3665
3666        assert!(client.request_instruments(&request).is_ok());
3667
3668        // Wait for response
3669        let timeout = tokio::time::Duration::from_secs(3);
3670        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3671            tokio::time::timeout(timeout, rx.recv()).await
3672        {
3673            // Verify timestamps are set
3674            assert!(resp.start.unwrap() > 0);
3675            assert!(resp.end.unwrap() > 0);
3676            assert!(resp.start.unwrap() <= resp.end.unwrap());
3677            assert!(resp.ts_init > 0);
3678        }
3679    }
3680
3681    #[tokio::test]
3682    async fn test_request_instruments_with_start_only() {
3683        // Test timestamp handling when only `start` is provided
3684        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3685        set_data_event_sender(sender);
3686
3687        let client_id = ClientId::from("DYDX-TS-START-ONLY");
3688        let config = DydxDataClientConfig::default();
3689        let http_client = DydxHttpClient::default();
3690        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3691
3692        let now = chrono::Utc::now();
3693        let start = Some(now - chrono::Duration::hours(24));
3694
3695        let request = RequestInstruments::new(
3696            start,
3697            None,
3698            Some(client_id),
3699            Some(*DYDX_VENUE),
3700            UUID4::new(),
3701            get_atomic_clock_realtime().get_time_ns(),
3702            None,
3703        );
3704
3705        assert!(client.request_instruments(&request).is_ok());
3706
3707        let timeout = tokio::time::Duration::from_secs(3);
3708        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3709            tokio::time::timeout(timeout, rx.recv()).await
3710        {
3711            assert!(resp.start.is_some());
3712            assert!(resp.end.is_none());
3713            assert!(resp.ts_init > 0);
3714        }
3715    }
3716
3717    #[tokio::test]
3718    async fn test_request_instruments_with_end_only() {
3719        // Test timestamp handling when only `end` is provided
3720        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3721        set_data_event_sender(sender);
3722
3723        let client_id = ClientId::from("DYDX-TS-END-ONLY");
3724        let config = DydxDataClientConfig::default();
3725        let http_client = DydxHttpClient::default();
3726        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3727
3728        let now = chrono::Utc::now();
3729        let end = Some(now);
3730
3731        let request = RequestInstruments::new(
3732            None,
3733            end,
3734            Some(client_id),
3735            Some(*DYDX_VENUE),
3736            UUID4::new(),
3737            get_atomic_clock_realtime().get_time_ns(),
3738            None,
3739        );
3740
3741        assert!(client.request_instruments(&request).is_ok());
3742
3743        let timeout = tokio::time::Duration::from_secs(3);
3744        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3745            tokio::time::timeout(timeout, rx.recv()).await
3746        {
3747            assert!(resp.start.is_none());
3748            assert!(resp.end.is_some());
3749            assert!(resp.ts_init > 0);
3750        }
3751    }
3752
3753    #[tokio::test]
3754    async fn test_request_instruments_client_id_fallback() {
3755        // Test client_id fallback to default when not provided
3756        setup_test_env();
3757
3758        let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3759        let config = DydxDataClientConfig::default();
3760        let http_client = DydxHttpClient::default();
3761        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3762
3763        let request = RequestInstruments::new(
3764            None,
3765            None,
3766            None, // No client_id provided
3767            Some(*DYDX_VENUE),
3768            UUID4::new(),
3769            get_atomic_clock_realtime().get_time_ns(),
3770            None,
3771        );
3772
3773        // Should use client's default client_id
3774        assert!(client.request_instruments(&request).is_ok());
3775    }
3776
3777    #[tokio::test]
3778    async fn test_request_instruments_with_params() {
3779        // Test custom params handling
3780        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3781        set_data_event_sender(sender);
3782
3783        let client_id = ClientId::from("DYDX-PARAMS-TEST");
3784        let config = DydxDataClientConfig::default();
3785        let http_client = DydxHttpClient::default();
3786        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3787
3788        // Create params - just verify they're passed through
3789        let mut params_map = IndexMap::new();
3790        params_map.insert("test_key".to_string(), "test_value".to_string());
3791
3792        let request = RequestInstruments::new(
3793            None,
3794            None,
3795            Some(client_id),
3796            Some(*DYDX_VENUE),
3797            UUID4::new(),
3798            get_atomic_clock_realtime().get_time_ns(),
3799            Some(params_map),
3800        );
3801
3802        assert!(client.request_instruments(&request).is_ok());
3803
3804        // Wait for response
3805        let timeout = tokio::time::Duration::from_secs(3);
3806        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3807            tokio::time::timeout(timeout, rx.recv()).await
3808        {
3809            // Verify params are propagated into the response
3810            assert_eq!(resp.client_id, client_id);
3811            let params = resp
3812                .params
3813                .expect("expected params to be present in InstrumentsResponse");
3814            assert_eq!(
3815                params.get("test_key").map(String::as_str),
3816                Some("test_value")
3817            );
3818        }
3819    }
3820
3821    // ========================================================================
3822    // request_instruments Parameter Combination Tests
3823    // ========================================================================
3824
3825    #[tokio::test]
3826    async fn test_request_instruments_with_start_and_end_range() {
3827        // Test timestamp handling when both start and end are provided
3828        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3829        set_data_event_sender(sender);
3830
3831        let client_id = ClientId::from("DYDX-START-END-RANGE");
3832        let config = DydxDataClientConfig::default();
3833        let http_client = DydxHttpClient::default();
3834        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3835
3836        let now = chrono::Utc::now();
3837        let start = Some(now - chrono::Duration::hours(48));
3838        let end = Some(now - chrono::Duration::hours(24));
3839
3840        let request = RequestInstruments::new(
3841            start,
3842            end,
3843            Some(client_id),
3844            Some(*DYDX_VENUE),
3845            UUID4::new(),
3846            get_atomic_clock_realtime().get_time_ns(),
3847            None,
3848        );
3849
3850        assert!(client.request_instruments(&request).is_ok());
3851
3852        let timeout = tokio::time::Duration::from_secs(3);
3853        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3854            tokio::time::timeout(timeout, rx.recv()).await
3855        {
3856            // Verify both timestamps are present
3857            assert!(
3858                resp.start.is_some(),
3859                "start timestamp should be present when provided"
3860            );
3861            assert!(
3862                resp.end.is_some(),
3863                "end timestamp should be present when provided"
3864            );
3865            assert!(resp.ts_init > 0, "ts_init should always be set");
3866
3867            // Verify start is before end
3868            if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3869                assert!(
3870                    start_ts < end_ts,
3871                    "start timestamp should be before end timestamp"
3872                );
3873            }
3874        }
3875    }
3876
3877    #[tokio::test]
3878    async fn test_request_instruments_different_client_ids() {
3879        // Test that different client_id values are properly handled using a shared channel.
3880        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3881        set_data_event_sender(sender);
3882
3883        let timeout = tokio::time::Duration::from_secs(3);
3884
3885        // First client
3886        let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3887        let config1 = DydxDataClientConfig::default();
3888        let http_client1 = DydxHttpClient::default();
3889        let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3890
3891        let request1 = RequestInstruments::new(
3892            None,
3893            None,
3894            Some(client_id_1),
3895            Some(*DYDX_VENUE),
3896            UUID4::new(),
3897            get_atomic_clock_realtime().get_time_ns(),
3898            None,
3899        );
3900
3901        assert!(client1.request_instruments(&request1).is_ok());
3902
3903        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3904            tokio::time::timeout(timeout, rx.recv()).await
3905        {
3906            assert_eq!(
3907                resp1.client_id, client_id_1,
3908                "Response should contain client_id_1"
3909            );
3910        }
3911
3912        // Second client
3913        let client_id_2 = ClientId::from("DYDX-CLIENT-2");
3914        let config2 = DydxDataClientConfig::default();
3915        let http_client2 = DydxHttpClient::default();
3916        let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
3917
3918        let request2 = RequestInstruments::new(
3919            None,
3920            None,
3921            Some(client_id_2),
3922            Some(*DYDX_VENUE),
3923            UUID4::new(),
3924            get_atomic_clock_realtime().get_time_ns(),
3925            None,
3926        );
3927
3928        assert!(client2.request_instruments(&request2).is_ok());
3929
3930        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
3931            tokio::time::timeout(timeout, rx.recv()).await
3932        {
3933            assert_eq!(
3934                resp2.client_id, client_id_2,
3935                "Response should contain client_id_2"
3936            );
3937            assert_ne!(
3938                resp2.client_id, client_id_1,
3939                "Different clients should have different client_ids"
3940            );
3941        }
3942    }
3943
3944    #[tokio::test]
3945    async fn test_request_instruments_no_timestamps() {
3946        // Test fetching all current instruments (no start/end filters)
3947        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3948        set_data_event_sender(sender);
3949
3950        let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
3951        let config = DydxDataClientConfig::default();
3952        let http_client = DydxHttpClient::default();
3953        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3954
3955        let request = RequestInstruments::new(
3956            None, // No start filter
3957            None, // No end filter
3958            Some(client_id),
3959            Some(*DYDX_VENUE),
3960            UUID4::new(),
3961            get_atomic_clock_realtime().get_time_ns(),
3962            None,
3963        );
3964
3965        assert!(client.request_instruments(&request).is_ok());
3966
3967        let timeout = tokio::time::Duration::from_secs(5);
3968        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3969            tokio::time::timeout(timeout, rx.recv()).await
3970        {
3971            // Verify no timestamp filters
3972            assert!(
3973                resp.start.is_none(),
3974                "start should be None when not provided"
3975            );
3976            assert!(resp.end.is_none(), "end should be None when not provided");
3977
3978            // Should still get current instruments
3979            assert_eq!(resp.venue, *DYDX_VENUE);
3980            assert_eq!(resp.client_id, client_id);
3981            assert!(resp.ts_init > 0);
3982        }
3983    }
3984
3985    // ========================================================================
3986    // request_instrument Tests
3987    // ========================================================================
3988
3989    #[tokio::test]
3990    async fn test_request_instrument_cache_hit() {
3991        // Test cache hit (instrument already cached)
3992        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3993        set_data_event_sender(sender);
3994
3995        let client_id = ClientId::from("DYDX-CACHE-HIT");
3996        let config = DydxDataClientConfig::default();
3997        let http_client = DydxHttpClient::default();
3998        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3999
4000        // Pre-populate cache with test instrument
4001        let instrument = create_test_instrument_any();
4002        let instrument_id = instrument.id();
4003        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4004        client.instruments.insert(symbol_key, instrument.clone());
4005
4006        let request = RequestInstrument::new(
4007            instrument_id,
4008            None,
4009            None,
4010            Some(client_id),
4011            UUID4::new(),
4012            get_atomic_clock_realtime().get_time_ns(),
4013            None,
4014        );
4015
4016        assert!(client.request_instrument(&request).is_ok());
4017
4018        // Should get immediate response from cache
4019        let timeout = tokio::time::Duration::from_millis(500);
4020        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4021            tokio::time::timeout(timeout, rx.recv()).await
4022        {
4023            assert_eq!(resp.instrument_id, instrument_id);
4024            assert_eq!(resp.client_id, client_id);
4025            assert_eq!(resp.data.id(), instrument_id);
4026        }
4027    }
4028
4029    #[tokio::test]
4030    async fn test_request_instrument_cache_miss() {
4031        // Test cache miss (fetch from API)
4032        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4033        set_data_event_sender(sender);
4034
4035        let client_id = ClientId::from("DYDX-CACHE-MISS");
4036        let config = DydxDataClientConfig::default();
4037        let http_client = DydxHttpClient::default();
4038        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4039
4040        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4041
4042        let request = RequestInstrument::new(
4043            instrument_id,
4044            None,
4045            None,
4046            Some(client_id),
4047            UUID4::new(),
4048            get_atomic_clock_realtime().get_time_ns(),
4049            None,
4050        );
4051
4052        assert!(client.request_instrument(&request).is_ok());
4053
4054        // Wait for async HTTP fetch and response
4055        let timeout = tokio::time::Duration::from_secs(5);
4056        let result = tokio::time::timeout(timeout, rx.recv()).await;
4057
4058        // May timeout if testnet unreachable, but should not panic
4059        match result {
4060            Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4061                assert_eq!(resp.instrument_id, instrument_id);
4062                assert_eq!(resp.client_id, client_id);
4063            }
4064            Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4065            Ok(None) => panic!("Channel closed unexpectedly"),
4066            Err(_) => {
4067                println!("Test timed out - testnet may be unreachable");
4068            }
4069        }
4070    }
4071
4072    #[tokio::test]
4073    async fn test_request_instrument_not_found() {
4074        // Test instrument not found scenario
4075        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4076        set_data_event_sender(sender);
4077
4078        let client_id = ClientId::from("DYDX-NOT-FOUND");
4079        let config = DydxDataClientConfig {
4080            base_url_http: Some("http://invalid-url.local".to_string()),
4081            ..Default::default()
4082        };
4083        let http_client = DydxHttpClient::new(
4084            config.base_url_http.clone(),
4085            config.http_timeout_secs,
4086            config.http_proxy_url.clone(),
4087            config.is_testnet,
4088            None,
4089        )
4090        .unwrap();
4091
4092        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4093
4094        let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4095
4096        let request = RequestInstrument::new(
4097            instrument_id,
4098            None,
4099            None,
4100            Some(client_id),
4101            UUID4::new(),
4102            get_atomic_clock_realtime().get_time_ns(),
4103            None,
4104        );
4105
4106        // Should not panic on invalid instrument
4107        assert!(client.request_instrument(&request).is_ok());
4108
4109        // Note: No response sent when instrument not found (by design)
4110        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4111    }
4112
4113    #[tokio::test]
4114    async fn test_request_instrument_bulk_caching() {
4115        // Test bulk caching when fetching from API
4116        setup_test_env();
4117
4118        let client_id = ClientId::from("DYDX-BULK-CACHE");
4119        let config = DydxDataClientConfig::default();
4120        let http_client = DydxHttpClient::default();
4121        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4122
4123        let initial_cache_size = client.instruments.len();
4124
4125        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4126
4127        let request = RequestInstrument::new(
4128            instrument_id,
4129            None,
4130            None,
4131            Some(client_id),
4132            UUID4::new(),
4133            get_atomic_clock_realtime().get_time_ns(),
4134            None,
4135        );
4136
4137        assert!(client.request_instrument(&request).is_ok());
4138
4139        // Wait for async bulk fetch
4140        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4141
4142        // Verify cache populated with all instruments (if HTTP succeeded)
4143        let final_cache_size = client.instruments.len();
4144        assert!(final_cache_size >= initial_cache_size);
4145        // If HTTP succeeded, cache should have multiple instruments
4146    }
4147
4148    #[tokio::test]
4149    async fn test_request_instrument_correlation_id() {
4150        // Test correlation_id matching
4151        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4152        set_data_event_sender(sender);
4153
4154        let client_id = ClientId::from("DYDX-CORR-ID");
4155        let config = DydxDataClientConfig::default();
4156        let http_client = DydxHttpClient::default();
4157        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4158
4159        // Pre-populate cache to get immediate response
4160        let instrument = create_test_instrument_any();
4161        let instrument_id = instrument.id();
4162        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4163        client.instruments.insert(symbol_key, instrument.clone());
4164
4165        let request_id = UUID4::new();
4166        let request = RequestInstrument::new(
4167            instrument_id,
4168            None,
4169            None,
4170            Some(client_id),
4171            request_id,
4172            get_atomic_clock_realtime().get_time_ns(),
4173            None,
4174        );
4175
4176        assert!(client.request_instrument(&request).is_ok());
4177
4178        // Verify correlation_id matches
4179        let timeout = tokio::time::Duration::from_millis(500);
4180        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4181            tokio::time::timeout(timeout, rx.recv()).await
4182        {
4183            assert_eq!(resp.correlation_id, request_id);
4184        }
4185    }
4186
4187    #[tokio::test]
4188    async fn test_request_instrument_response_format_boxed() {
4189        // Verify InstrumentResponse format (boxed)
4190        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4191        set_data_event_sender(sender);
4192
4193        let client_id = ClientId::from("DYDX-BOXED");
4194        let config = DydxDataClientConfig::default();
4195        let http_client = DydxHttpClient::default();
4196        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4197
4198        // Pre-populate cache
4199        let instrument = create_test_instrument_any();
4200        let instrument_id = instrument.id();
4201        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4202        client.instruments.insert(symbol_key, instrument.clone());
4203
4204        let request = RequestInstrument::new(
4205            instrument_id,
4206            None,
4207            None,
4208            Some(client_id),
4209            UUID4::new(),
4210            get_atomic_clock_realtime().get_time_ns(),
4211            None,
4212        );
4213
4214        assert!(client.request_instrument(&request).is_ok());
4215
4216        // Verify response is properly boxed
4217        let timeout = tokio::time::Duration::from_millis(500);
4218        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4219            tokio::time::timeout(timeout, rx.recv()).await
4220        {
4221            // Verify boxed response structure
4222            assert_eq!(boxed_resp.instrument_id, instrument_id);
4223            assert_eq!(boxed_resp.client_id, client_id);
4224            assert!(boxed_resp.start.is_none());
4225            assert!(boxed_resp.end.is_none());
4226            assert!(boxed_resp.ts_init > 0);
4227        }
4228    }
4229
4230    #[rstest]
4231    fn test_request_instrument_symbol_extraction() {
4232        // Test symbol extraction from InstrumentId
4233        setup_test_env();
4234
4235        let client_id = ClientId::from("DYDX-SYMBOL");
4236        let config = DydxDataClientConfig::default();
4237        let http_client = DydxHttpClient::default();
4238        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4239
4240        // Test various instrument ID formats
4241        // Note: Symbol includes the -PERP suffix in dYdX
4242        let test_cases = vec![
4243            ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4244            ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4245            ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4246        ];
4247
4248        for (instrument_id_str, expected_symbol) in test_cases {
4249            let instrument_id = InstrumentId::from(instrument_id_str);
4250            let symbol = Ustr::from(instrument_id.symbol.as_str());
4251            assert_eq!(symbol.as_str(), expected_symbol);
4252        }
4253    }
4254
4255    #[tokio::test]
4256    async fn test_request_instrument_client_id_fallback() {
4257        // Test client_id fallback to default
4258        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4259        set_data_event_sender(sender);
4260
4261        let client_id = ClientId::from("DYDX-FALLBACK");
4262        let config = DydxDataClientConfig::default();
4263        let http_client = DydxHttpClient::default();
4264        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4265
4266        // Pre-populate cache
4267        let instrument = create_test_instrument_any();
4268        let instrument_id = instrument.id();
4269        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4270        client.instruments.insert(symbol_key, instrument.clone());
4271
4272        let request = RequestInstrument::new(
4273            instrument_id,
4274            None,
4275            None,
4276            None, // No client_id provided
4277            UUID4::new(),
4278            get_atomic_clock_realtime().get_time_ns(),
4279            None,
4280        );
4281
4282        assert!(client.request_instrument(&request).is_ok());
4283
4284        // Should use client's default client_id
4285        let timeout = tokio::time::Duration::from_millis(500);
4286        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4287            tokio::time::timeout(timeout, rx.recv()).await
4288        {
4289            assert_eq!(resp.client_id, client_id);
4290        }
4291    }
4292
4293    // ========================================================================
4294    // request_trades Tests
4295    // ========================================================================
4296
4297    #[tokio::test]
4298    async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4299        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4300        set_data_event_sender(sender);
4301
4302        let created_at = chrono::Utc::now();
4303
4304        let http_trade = crate::http::models::Trade {
4305            id: "trade-1".to_string(),
4306            side: OrderSide::Buy,
4307            size: dec!(1.5),
4308            price: dec!(100.25),
4309            created_at,
4310            created_at_height: 1,
4311            trade_type: crate::common::enums::DydxTradeType::Limit,
4312        };
4313
4314        let trades_response = crate::http::models::TradesResponse {
4315            trades: vec![http_trade],
4316        };
4317
4318        let state = TradesTestState {
4319            response: Arc::new(trades_response),
4320            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4321            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4322        };
4323
4324        let addr = start_trades_test_server(state.clone()).await;
4325        let base_url = format!("http://{}", addr);
4326
4327        let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4328        let config = DydxDataClientConfig {
4329            base_url_http: Some(base_url),
4330            is_testnet: true,
4331            ..Default::default()
4332        };
4333
4334        let http_client = DydxHttpClient::new(
4335            config.base_url_http.clone(),
4336            config.http_timeout_secs,
4337            config.http_proxy_url.clone(),
4338            config.is_testnet,
4339            None,
4340        )
4341        .unwrap();
4342
4343        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4344
4345        let instrument = create_test_instrument_any();
4346        let instrument_id = instrument.id();
4347        let price_precision = instrument.price_precision();
4348        let size_precision = instrument.size_precision();
4349        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4350        client.instruments.insert(symbol_key, instrument);
4351
4352        let request_id = UUID4::new();
4353        let now = chrono::Utc::now();
4354        let start = Some(now - chrono::Duration::seconds(10));
4355        let end = Some(now + chrono::Duration::seconds(10));
4356        let limit = std::num::NonZeroUsize::new(100).unwrap();
4357
4358        let request = RequestTrades::new(
4359            instrument_id,
4360            start,
4361            end,
4362            Some(limit),
4363            Some(client_id),
4364            request_id,
4365            get_atomic_clock_realtime().get_time_ns(),
4366            None,
4367        );
4368
4369        assert!(client.request_trades(&request).is_ok());
4370
4371        let timeout = tokio::time::Duration::from_secs(1);
4372        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4373            tokio::time::timeout(timeout, rx.recv()).await
4374        {
4375            assert_eq!(resp.correlation_id, request_id);
4376            assert_eq!(resp.client_id, client_id);
4377            assert_eq!(resp.instrument_id, instrument_id);
4378            assert_eq!(resp.data.len(), 1);
4379
4380            let tick = &resp.data[0];
4381            assert_eq!(tick.instrument_id, instrument_id);
4382            assert_eq!(tick.price, Price::new(100.25, price_precision));
4383            assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4384            assert_eq!(tick.trade_id.to_string(), "trade-1");
4385
4386            use nautilus_model::enums::AggressorSide;
4387            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4388        } else {
4389            panic!("did not receive trades response in time");
4390        }
4391
4392        // Verify symbol conversion (strip -PERP suffix) and limit propagation.
4393        let last_ticker = state.last_ticker.lock().await.clone();
4394        assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4395
4396        let last_limit = *state.last_limit.lock().await;
4397        assert_eq!(last_limit, Some(Some(100)));
4398    }
4399
4400    #[tokio::test]
4401    async fn test_request_trades_empty_response_and_no_limit() {
4402        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4403        set_data_event_sender(sender);
4404
4405        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4406
4407        let state = TradesTestState {
4408            response: Arc::new(trades_response),
4409            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4410            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4411        };
4412
4413        let addr = start_trades_test_server(state.clone()).await;
4414        let base_url = format!("http://{}", addr);
4415
4416        let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4417        let config = DydxDataClientConfig {
4418            base_url_http: Some(base_url),
4419            is_testnet: true,
4420            ..Default::default()
4421        };
4422
4423        let http_client = DydxHttpClient::new(
4424            config.base_url_http.clone(),
4425            config.http_timeout_secs,
4426            config.http_proxy_url.clone(),
4427            config.is_testnet,
4428            None,
4429        )
4430        .unwrap();
4431
4432        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4433
4434        let instrument = create_test_instrument_any();
4435        let instrument_id = instrument.id();
4436        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4437        client.instruments.insert(symbol_key, instrument);
4438
4439        let request_id = UUID4::new();
4440
4441        let request = RequestTrades::new(
4442            instrument_id,
4443            None,
4444            None,
4445            None, // No limit
4446            Some(client_id),
4447            request_id,
4448            get_atomic_clock_realtime().get_time_ns(),
4449            None,
4450        );
4451
4452        assert!(client.request_trades(&request).is_ok());
4453
4454        let timeout = tokio::time::Duration::from_secs(1);
4455        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4456            tokio::time::timeout(timeout, rx.recv()).await
4457        {
4458            assert_eq!(resp.correlation_id, request_id);
4459            assert_eq!(resp.client_id, client_id);
4460            assert_eq!(resp.instrument_id, instrument_id);
4461            assert!(resp.data.is_empty());
4462        } else {
4463            panic!("did not receive trades response in time");
4464        }
4465
4466        // Verify that no `limit` query parameter was sent.
4467        let last_limit = *state.last_limit.lock().await;
4468        assert_eq!(last_limit, Some(None));
4469    }
4470
4471    #[tokio::test]
4472    async fn test_request_trades_timestamp_filtering() {
4473        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4474        set_data_event_sender(sender);
4475
4476        let now = chrono::Utc::now();
4477        let trade_before = crate::http::models::Trade {
4478            id: "before".to_string(),
4479            side: OrderSide::Buy,
4480            size: dec!(1.0),
4481            price: dec!(100.0),
4482            created_at: now - chrono::Duration::seconds(60),
4483            created_at_height: 1,
4484            trade_type: crate::common::enums::DydxTradeType::Limit,
4485        };
4486        let trade_inside = crate::http::models::Trade {
4487            id: "inside".to_string(),
4488            side: OrderSide::Sell,
4489            size: dec!(2.0),
4490            price: dec!(101.0),
4491            created_at: now,
4492            created_at_height: 2,
4493            trade_type: crate::common::enums::DydxTradeType::Limit,
4494        };
4495        let trade_after = crate::http::models::Trade {
4496            id: "after".to_string(),
4497            side: OrderSide::Buy,
4498            size: dec!(3.0),
4499            price: dec!(102.0),
4500            created_at: now + chrono::Duration::seconds(60),
4501            created_at_height: 3,
4502            trade_type: crate::common::enums::DydxTradeType::Limit,
4503        };
4504
4505        let trades_response = crate::http::models::TradesResponse {
4506            trades: vec![trade_before, trade_inside.clone(), trade_after],
4507        };
4508
4509        let state = TradesTestState {
4510            response: Arc::new(trades_response),
4511            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4512            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4513        };
4514
4515        let addr = start_trades_test_server(state).await;
4516        let base_url = format!("http://{}", addr);
4517
4518        let client_id = ClientId::from("DYDX-TRADES-FILTER");
4519        let config = DydxDataClientConfig {
4520            base_url_http: Some(base_url),
4521            is_testnet: true,
4522            ..Default::default()
4523        };
4524
4525        let http_client = DydxHttpClient::new(
4526            config.base_url_http.clone(),
4527            config.http_timeout_secs,
4528            config.http_proxy_url.clone(),
4529            config.is_testnet,
4530            None,
4531        )
4532        .unwrap();
4533
4534        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4535
4536        let instrument = create_test_instrument_any();
4537        let instrument_id = instrument.id();
4538        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4539        client.instruments.insert(symbol_key, instrument);
4540
4541        let request_id = UUID4::new();
4542
4543        // Filter range includes only the "inside" trade.
4544        let start = Some(now - chrono::Duration::seconds(10));
4545        let end = Some(now + chrono::Duration::seconds(10));
4546
4547        let request = RequestTrades::new(
4548            instrument_id,
4549            start,
4550            end,
4551            None,
4552            Some(client_id),
4553            request_id,
4554            get_atomic_clock_realtime().get_time_ns(),
4555            None,
4556        );
4557
4558        assert!(client.request_trades(&request).is_ok());
4559
4560        let timeout = tokio::time::Duration::from_secs(1);
4561        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4562            tokio::time::timeout(timeout, rx.recv()).await
4563        {
4564            assert_eq!(resp.correlation_id, request_id);
4565            assert_eq!(resp.client_id, client_id);
4566            assert_eq!(resp.instrument_id, instrument_id);
4567            assert_eq!(resp.data.len(), 1);
4568
4569            let tick = &resp.data[0];
4570            assert_eq!(tick.trade_id.to_string(), "inside");
4571            assert_eq!(tick.price.as_decimal(), dec!(101.0));
4572        } else {
4573            panic!("did not receive trades response in time");
4574        }
4575    }
4576
4577    #[tokio::test]
4578    async fn test_request_trades_correlation_id_matching() {
4579        // Test correlation_id matching in response
4580        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4581        set_data_event_sender(sender);
4582
4583        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4584
4585        let state = TradesTestState {
4586            response: Arc::new(trades_response),
4587            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4588            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4589        };
4590
4591        let addr = start_trades_test_server(state).await;
4592        let base_url = format!("http://{}", addr);
4593
4594        let client_id = ClientId::from("DYDX-TRADES-CORR");
4595        let config = DydxDataClientConfig {
4596            base_url_http: Some(base_url),
4597            is_testnet: true,
4598            ..Default::default()
4599        };
4600
4601        let http_client = DydxHttpClient::new(
4602            config.base_url_http.clone(),
4603            config.http_timeout_secs,
4604            config.http_proxy_url.clone(),
4605            config.is_testnet,
4606            None,
4607        )
4608        .unwrap();
4609
4610        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4611
4612        let instrument = create_test_instrument_any();
4613        let instrument_id = instrument.id();
4614        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4615        client.instruments.insert(symbol_key, instrument);
4616
4617        let request_id = UUID4::new();
4618        let request = RequestTrades::new(
4619            instrument_id,
4620            None,
4621            None,
4622            None,
4623            Some(client_id),
4624            request_id,
4625            get_atomic_clock_realtime().get_time_ns(),
4626            None,
4627        );
4628
4629        assert!(client.request_trades(&request).is_ok());
4630
4631        let timeout = tokio::time::Duration::from_millis(500);
4632        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4633            tokio::time::timeout(timeout, rx.recv()).await
4634        {
4635            assert_eq!(resp.correlation_id, request_id);
4636        }
4637    }
4638
4639    #[tokio::test]
4640    async fn test_request_trades_response_format() {
4641        // Verify TradesResponse format
4642        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4643        set_data_event_sender(sender);
4644
4645        let created_at = chrono::Utc::now();
4646        let http_trade = crate::http::models::Trade {
4647            id: "format-test".to_string(),
4648            side: OrderSide::Sell,
4649            size: dec!(5.0),
4650            price: dec!(200.0),
4651            created_at,
4652            created_at_height: 100,
4653            trade_type: crate::common::enums::DydxTradeType::Limit,
4654        };
4655
4656        let trades_response = crate::http::models::TradesResponse {
4657            trades: vec![http_trade],
4658        };
4659
4660        let state = TradesTestState {
4661            response: Arc::new(trades_response),
4662            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4663            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4664        };
4665
4666        let addr = start_trades_test_server(state).await;
4667        let base_url = format!("http://{}", addr);
4668
4669        let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4670        let config = DydxDataClientConfig {
4671            base_url_http: Some(base_url),
4672            is_testnet: true,
4673            ..Default::default()
4674        };
4675
4676        let http_client = DydxHttpClient::new(
4677            config.base_url_http.clone(),
4678            config.http_timeout_secs,
4679            config.http_proxy_url.clone(),
4680            config.is_testnet,
4681            None,
4682        )
4683        .unwrap();
4684
4685        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4686
4687        let instrument = create_test_instrument_any();
4688        let instrument_id = instrument.id();
4689        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4690        client.instruments.insert(symbol_key, instrument);
4691
4692        let request = RequestTrades::new(
4693            instrument_id,
4694            None,
4695            None,
4696            None,
4697            Some(client_id),
4698            UUID4::new(),
4699            get_atomic_clock_realtime().get_time_ns(),
4700            None,
4701        );
4702
4703        assert!(client.request_trades(&request).is_ok());
4704
4705        let timeout = tokio::time::Duration::from_millis(500);
4706        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4707            tokio::time::timeout(timeout, rx.recv()).await
4708        {
4709            // Verify response structure
4710            assert_eq!(resp.client_id, client_id);
4711            assert_eq!(resp.instrument_id, instrument_id);
4712            assert!(resp.data.len() == 1);
4713            assert!(resp.ts_init > 0);
4714
4715            // Verify trade tick structure
4716            let tick = &resp.data[0];
4717            assert_eq!(tick.instrument_id, instrument_id);
4718            assert!(tick.ts_event > 0);
4719            assert!(tick.ts_init > 0);
4720        }
4721    }
4722
4723    #[tokio::test]
4724    async fn test_request_trades_no_instrument_in_cache() {
4725        // Test empty response when instrument not in cache
4726        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4727        set_data_event_sender(sender);
4728
4729        let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4730        let config = DydxDataClientConfig::default();
4731        let http_client = DydxHttpClient::default();
4732        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4733
4734        // Don't add instrument to cache
4735        let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4736
4737        let request = RequestTrades::new(
4738            instrument_id,
4739            None,
4740            None,
4741            None,
4742            Some(client_id),
4743            UUID4::new(),
4744            get_atomic_clock_realtime().get_time_ns(),
4745            None,
4746        );
4747
4748        assert!(client.request_trades(&request).is_ok());
4749
4750        // Should receive empty response when instrument not found
4751        let timeout = tokio::time::Duration::from_millis(500);
4752        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4753            tokio::time::timeout(timeout, rx.recv()).await
4754        {
4755            assert!(resp.data.is_empty());
4756        }
4757    }
4758
4759    #[tokio::test]
4760    async fn test_request_trades_limit_parameter() {
4761        // Test limit parameter handling
4762        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4763        set_data_event_sender(sender);
4764
4765        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4766
4767        let state = TradesTestState {
4768            response: Arc::new(trades_response),
4769            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4770            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4771        };
4772
4773        let addr = start_trades_test_server(state.clone()).await;
4774        let base_url = format!("http://{}", addr);
4775
4776        let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4777        let config = DydxDataClientConfig {
4778            base_url_http: Some(base_url),
4779            is_testnet: true,
4780            ..Default::default()
4781        };
4782
4783        let http_client = DydxHttpClient::new(
4784            config.base_url_http.clone(),
4785            config.http_timeout_secs,
4786            config.http_proxy_url.clone(),
4787            config.is_testnet,
4788            None,
4789        )
4790        .unwrap();
4791
4792        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4793
4794        let instrument = create_test_instrument_any();
4795        let instrument_id = instrument.id();
4796        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4797        client.instruments.insert(symbol_key, instrument);
4798
4799        // Test with limit
4800        let limit = std::num::NonZeroUsize::new(500).unwrap();
4801        let request = RequestTrades::new(
4802            instrument_id,
4803            None,
4804            None,
4805            Some(limit),
4806            Some(client_id),
4807            UUID4::new(),
4808            get_atomic_clock_realtime().get_time_ns(),
4809            None,
4810        );
4811
4812        assert!(client.request_trades(&request).is_ok());
4813        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
4814
4815        // Verify limit was passed to HTTP client
4816        let last_limit = *state.last_limit.lock().await;
4817        assert_eq!(last_limit, Some(Some(500)));
4818    }
4819
4820    #[rstest]
4821    fn test_request_trades_symbol_conversion() {
4822        // Test symbol conversion (strip -PERP suffix)
4823        setup_test_env();
4824
4825        let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4826        let config = DydxDataClientConfig::default();
4827        let http_client = DydxHttpClient::default();
4828        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4829
4830        // Verify symbol format for various instruments
4831        let test_cases = vec![
4832            ("BTC-USD-PERP.DYDX", "BTC-USD"),
4833            ("ETH-USD-PERP.DYDX", "ETH-USD"),
4834            ("SOL-USD-PERP.DYDX", "SOL-USD"),
4835        ];
4836
4837        for (instrument_id_str, expected_ticker) in test_cases {
4838            let instrument_id = InstrumentId::from(instrument_id_str);
4839            let ticker = instrument_id
4840                .symbol
4841                .as_str()
4842                .trim_end_matches("-PERP")
4843                .to_string();
4844            assert_eq!(ticker, expected_ticker);
4845        }
4846    }
4847
4848    // ========================================================================
4849    // HTTP Error Handling Tests
4850    // ========================================================================
4851
4852    #[tokio::test]
4853    async fn test_http_404_handling() {
4854        // Test HTTP 404 handling (instrument not found)
4855        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4856        set_data_event_sender(sender);
4857
4858        let client_id = ClientId::from("DYDX-404");
4859        let config = DydxDataClientConfig {
4860            base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4861            http_timeout_secs: Some(1),
4862            ..Default::default()
4863        };
4864
4865        let http_client = DydxHttpClient::new(
4866            config.base_url_http.clone(),
4867            config.http_timeout_secs,
4868            config.http_proxy_url.clone(),
4869            config.is_testnet,
4870            None,
4871        )
4872        .unwrap();
4873
4874        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4875
4876        let request = RequestInstruments::new(
4877            None,
4878            None,
4879            Some(client_id),
4880            Some(*DYDX_VENUE),
4881            UUID4::new(),
4882            get_atomic_clock_realtime().get_time_ns(),
4883            None,
4884        );
4885
4886        assert!(client.request_instruments(&request).is_ok());
4887
4888        // Should receive empty response on 404
4889        let timeout = tokio::time::Duration::from_secs(2);
4890        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4891            tokio::time::timeout(timeout, rx.recv()).await
4892        {
4893            assert!(resp.data.is_empty(), "Expected empty response on 404");
4894        }
4895    }
4896
4897    #[tokio::test]
4898    async fn test_http_500_handling() {
4899        // Test HTTP 500 handling (internal server error)
4900        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4901        set_data_event_sender(sender);
4902
4903        let client_id = ClientId::from("DYDX-500");
4904        let config = DydxDataClientConfig {
4905            base_url_http: Some("http://httpstat.us/500".to_string()),
4906            http_timeout_secs: Some(2),
4907            ..Default::default()
4908        };
4909
4910        let http_client = DydxHttpClient::new(
4911            config.base_url_http.clone(),
4912            config.http_timeout_secs,
4913            config.http_proxy_url.clone(),
4914            config.is_testnet,
4915            None,
4916        )
4917        .unwrap();
4918
4919        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4920
4921        let instrument = create_test_instrument_any();
4922        let instrument_id = instrument.id();
4923        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4924        client.instruments.insert(symbol_key, instrument);
4925
4926        let request = RequestTrades::new(
4927            instrument_id,
4928            None,
4929            None,
4930            None,
4931            Some(client_id),
4932            UUID4::new(),
4933            get_atomic_clock_realtime().get_time_ns(),
4934            None,
4935        );
4936
4937        assert!(client.request_trades(&request).is_ok());
4938
4939        // Should receive empty response on 500 error
4940        let timeout = tokio::time::Duration::from_secs(3);
4941        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4942            tokio::time::timeout(timeout, rx.recv()).await
4943        {
4944            assert!(resp.data.is_empty(), "Expected empty response on 500");
4945        }
4946    }
4947
4948    #[tokio::test]
4949    async fn test_network_timeout_handling() {
4950        // Test network timeout scenarios
4951        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4952        set_data_event_sender(sender);
4953
4954        let client_id = ClientId::from("DYDX-TIMEOUT");
4955        let config = DydxDataClientConfig {
4956            base_url_http: Some("http://10.255.255.1:81".to_string()), // Non-routable IP
4957            http_timeout_secs: Some(1),                                // Very short timeout
4958            ..Default::default()
4959        };
4960
4961        let http_client = DydxHttpClient::new(
4962            config.base_url_http.clone(),
4963            config.http_timeout_secs,
4964            config.http_proxy_url.clone(),
4965            config.is_testnet,
4966            None,
4967        )
4968        .unwrap();
4969
4970        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4971
4972        let request = RequestInstruments::new(
4973            None,
4974            None,
4975            Some(client_id),
4976            Some(*DYDX_VENUE),
4977            UUID4::new(),
4978            get_atomic_clock_realtime().get_time_ns(),
4979            None,
4980        );
4981
4982        assert!(client.request_instruments(&request).is_ok());
4983
4984        // Should timeout and return empty response
4985        let timeout = tokio::time::Duration::from_secs(3);
4986        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4987            tokio::time::timeout(timeout, rx.recv()).await
4988        {
4989            assert!(resp.data.is_empty(), "Expected empty response on timeout");
4990        }
4991    }
4992
4993    #[tokio::test]
4994    async fn test_connection_refused_handling() {
4995        // Test connection refused errors
4996        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4997        set_data_event_sender(sender);
4998
4999        let client_id = ClientId::from("DYDX-REFUSED");
5000        let config = DydxDataClientConfig {
5001            base_url_http: Some("http://localhost:9999".to_string()), // Port unlikely to be open
5002            http_timeout_secs: Some(1),
5003            ..Default::default()
5004        };
5005
5006        let http_client = DydxHttpClient::new(
5007            config.base_url_http.clone(),
5008            config.http_timeout_secs,
5009            config.http_proxy_url.clone(),
5010            config.is_testnet,
5011            None,
5012        )
5013        .unwrap();
5014
5015        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5016
5017        let instrument = create_test_instrument_any();
5018        let instrument_id = instrument.id();
5019        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5020        client.instruments.insert(symbol_key, instrument);
5021
5022        let request = RequestInstrument::new(
5023            instrument_id,
5024            None,
5025            None,
5026            Some(client_id),
5027            UUID4::new(),
5028            get_atomic_clock_realtime().get_time_ns(),
5029            None,
5030        );
5031
5032        assert!(client.request_instrument(&request).is_ok());
5033
5034        // Should handle connection refused gracefully
5035        let timeout = tokio::time::Duration::from_secs(2);
5036        let result = tokio::time::timeout(timeout, rx.recv()).await;
5037
5038        // May not receive response if connection fails before spawning handler
5039        // This is acceptable - the important part is no panic
5040        match result {
5041            Ok(Some(DataEvent::Response(_))) => {
5042                // Response received (empty data expected)
5043            }
5044            Ok(None) | Err(_) => {
5045                // No response or timeout - acceptable for connection refused
5046            }
5047            _ => {}
5048        }
5049    }
5050
5051    #[tokio::test]
5052    async fn test_dns_resolution_failure_handling() {
5053        // Test DNS resolution failures
5054        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5055        set_data_event_sender(sender);
5056
5057        let client_id = ClientId::from("DYDX-DNS");
5058        let config = DydxDataClientConfig {
5059            base_url_http: Some(
5060                "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5061            ),
5062            http_timeout_secs: Some(2),
5063            ..Default::default()
5064        };
5065
5066        let http_client = DydxHttpClient::new(
5067            config.base_url_http.clone(),
5068            config.http_timeout_secs,
5069            config.http_proxy_url.clone(),
5070            config.is_testnet,
5071            None,
5072        )
5073        .unwrap();
5074
5075        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5076
5077        let request = RequestInstruments::new(
5078            None,
5079            None,
5080            Some(client_id),
5081            Some(*DYDX_VENUE),
5082            UUID4::new(),
5083            get_atomic_clock_realtime().get_time_ns(),
5084            None,
5085        );
5086
5087        assert!(client.request_instruments(&request).is_ok());
5088
5089        // Should handle DNS failure gracefully with empty response
5090        let timeout = tokio::time::Duration::from_secs(3);
5091        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5092            tokio::time::timeout(timeout, rx.recv()).await
5093        {
5094            assert!(
5095                resp.data.is_empty(),
5096                "Expected empty response on DNS failure"
5097            );
5098        }
5099    }
5100
5101    #[tokio::test]
5102    async fn test_http_502_503_handling() {
5103        // Test HTTP 502/503 handling (bad gateway/service unavailable)
5104        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5105        set_data_event_sender(sender);
5106
5107        let client_id = ClientId::from("DYDX-503");
5108        let config = DydxDataClientConfig {
5109            base_url_http: Some("http://httpstat.us/503".to_string()),
5110            http_timeout_secs: Some(2),
5111            ..Default::default()
5112        };
5113
5114        let http_client = DydxHttpClient::new(
5115            config.base_url_http.clone(),
5116            config.http_timeout_secs,
5117            config.http_proxy_url.clone(),
5118            config.is_testnet,
5119            None,
5120        )
5121        .unwrap();
5122
5123        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5124
5125        let request = RequestInstruments::new(
5126            None,
5127            None,
5128            Some(client_id),
5129            Some(*DYDX_VENUE),
5130            UUID4::new(),
5131            get_atomic_clock_realtime().get_time_ns(),
5132            None,
5133        );
5134
5135        assert!(client.request_instruments(&request).is_ok());
5136
5137        // Should handle 503 gracefully with empty response
5138        let timeout = tokio::time::Duration::from_secs(3);
5139        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5140            tokio::time::timeout(timeout, rx.recv()).await
5141        {
5142            assert!(resp.data.is_empty(), "Expected empty response on 503");
5143        }
5144    }
5145
5146    #[tokio::test]
5147    async fn test_http_429_rate_limit_handling() {
5148        // Test HTTP 429 handling (rate limit exceeded)
5149        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5150        set_data_event_sender(sender);
5151
5152        let client_id = ClientId::from("DYDX-429");
5153        let config = DydxDataClientConfig {
5154            base_url_http: Some("http://httpstat.us/429".to_string()),
5155            http_timeout_secs: Some(2),
5156            ..Default::default()
5157        };
5158
5159        let http_client = DydxHttpClient::new(
5160            config.base_url_http.clone(),
5161            config.http_timeout_secs,
5162            config.http_proxy_url.clone(),
5163            config.is_testnet,
5164            None,
5165        )
5166        .unwrap();
5167
5168        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5169
5170        let instrument = create_test_instrument_any();
5171        let instrument_id = instrument.id();
5172        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5173        client.instruments.insert(symbol_key, instrument);
5174
5175        let request = RequestTrades::new(
5176            instrument_id,
5177            None,
5178            None,
5179            None,
5180            Some(client_id),
5181            UUID4::new(),
5182            get_atomic_clock_realtime().get_time_ns(),
5183            None,
5184        );
5185
5186        assert!(client.request_trades(&request).is_ok());
5187
5188        // Should handle rate limit with empty response
5189        let timeout = tokio::time::Duration::from_secs(3);
5190        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5191            tokio::time::timeout(timeout, rx.recv()).await
5192        {
5193            assert!(
5194                resp.data.is_empty(),
5195                "Expected empty response on rate limit"
5196            );
5197        }
5198    }
5199
5200    #[tokio::test]
5201    async fn test_error_handling_does_not_panic() {
5202        // Test that error scenarios don't cause panics
5203        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5204        set_data_event_sender(sender);
5205
5206        let client_id = ClientId::from("DYDX-NO-PANIC");
5207        let config = DydxDataClientConfig {
5208            base_url_http: Some("http://invalid".to_string()),
5209            ..Default::default()
5210        };
5211
5212        let http_client = DydxHttpClient::new(
5213            config.base_url_http.clone(),
5214            config.http_timeout_secs,
5215            config.http_proxy_url.clone(),
5216            config.is_testnet,
5217            None,
5218        )
5219        .unwrap();
5220
5221        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5222
5223        // All these should return Ok() without panicking
5224        let request_instruments = RequestInstruments::new(
5225            None,
5226            None,
5227            Some(client_id),
5228            Some(*DYDX_VENUE),
5229            UUID4::new(),
5230            get_atomic_clock_realtime().get_time_ns(),
5231            None,
5232        );
5233        assert!(client.request_instruments(&request_instruments).is_ok());
5234
5235        let instrument_id = InstrumentId::from("INVALID.DYDX");
5236        let request_instrument = RequestInstrument::new(
5237            instrument_id,
5238            None,
5239            None,
5240            Some(client_id),
5241            UUID4::new(),
5242            get_atomic_clock_realtime().get_time_ns(),
5243            None,
5244        );
5245        assert!(client.request_instrument(&request_instrument).is_ok());
5246
5247        let request_trades = RequestTrades::new(
5248            instrument_id,
5249            None,
5250            None,
5251            None,
5252            Some(client_id),
5253            UUID4::new(),
5254            get_atomic_clock_realtime().get_time_ns(),
5255            None,
5256        );
5257        assert!(client.request_trades(&request_trades).is_ok());
5258    }
5259
5260    // ========================================================================
5261    // Parse Error Tests
5262    // ========================================================================
5263
5264    #[tokio::test]
5265    async fn test_malformed_json_response() {
5266        // Test handling of malformed JSON from API
5267        use axum::{Router, routing::get};
5268
5269        #[derive(Clone)]
5270        struct MalformedState;
5271
5272        async fn malformed_markets_handler() -> String {
5273            // Invalid JSON - missing closing brace
5274            r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5275        }
5276
5277        let app = Router::new()
5278            .route("/v4/markets", get(malformed_markets_handler))
5279            .with_state(MalformedState);
5280
5281        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5282        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5283        let port = listener.local_addr().unwrap().port();
5284
5285        tokio::spawn(async move {
5286            axum::serve(listener, app).await.unwrap();
5287        });
5288
5289        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5290
5291        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5292        set_data_event_sender(sender);
5293
5294        let client_id = ClientId::from("DYDX-MALFORMED");
5295        let config = DydxDataClientConfig {
5296            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5297            http_timeout_secs: Some(2),
5298            ..Default::default()
5299        };
5300
5301        let http_client = DydxHttpClient::new(
5302            config.base_url_http.clone(),
5303            config.http_timeout_secs,
5304            config.http_proxy_url.clone(),
5305            config.is_testnet,
5306            None,
5307        )
5308        .unwrap();
5309
5310        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5311
5312        let request = RequestInstruments::new(
5313            None,
5314            None,
5315            Some(client_id),
5316            Some(*DYDX_VENUE),
5317            UUID4::new(),
5318            get_atomic_clock_realtime().get_time_ns(),
5319            None,
5320        );
5321
5322        assert!(client.request_instruments(&request).is_ok());
5323
5324        // Should handle malformed JSON gracefully with empty response
5325        let timeout = tokio::time::Duration::from_secs(3);
5326        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5327            tokio::time::timeout(timeout, rx.recv()).await
5328        {
5329            assert!(
5330                resp.data.is_empty(),
5331                "Expected empty response on malformed JSON"
5332            );
5333        }
5334    }
5335
5336    #[tokio::test]
5337    async fn test_missing_required_fields_in_response() {
5338        // Test handling when API response missing required fields
5339        use axum::{Json, Router, routing::get};
5340        use serde_json::{Value, json};
5341
5342        #[derive(Clone)]
5343        struct MissingFieldsState;
5344
5345        async fn missing_fields_handler() -> Json<Value> {
5346            // Missing critical fields like "ticker", "stepSize", etc.
5347            Json(json!({
5348                "markets": {
5349                    "BTC-USD": {
5350                        // Missing "ticker"
5351                        "status": "ACTIVE",
5352                        "baseAsset": "BTC",
5353                        "quoteAsset": "USD",
5354                        // Missing "stepSize", "tickSize", "minOrderSize"
5355                    }
5356                }
5357            }))
5358        }
5359
5360        let app = Router::new()
5361            .route("/v4/markets", get(missing_fields_handler))
5362            .with_state(MissingFieldsState);
5363
5364        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5365        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5366        let port = listener.local_addr().unwrap().port();
5367
5368        tokio::spawn(async move {
5369            axum::serve(listener, app).await.unwrap();
5370        });
5371
5372        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5373
5374        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5375        set_data_event_sender(sender);
5376
5377        let client_id = ClientId::from("DYDX-MISSING");
5378        let config = DydxDataClientConfig {
5379            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5380            http_timeout_secs: Some(2),
5381            ..Default::default()
5382        };
5383
5384        let http_client = DydxHttpClient::new(
5385            config.base_url_http.clone(),
5386            config.http_timeout_secs,
5387            config.http_proxy_url.clone(),
5388            config.is_testnet,
5389            None,
5390        )
5391        .unwrap();
5392
5393        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5394
5395        let request = RequestInstruments::new(
5396            None,
5397            None,
5398            Some(client_id),
5399            Some(*DYDX_VENUE),
5400            UUID4::new(),
5401            get_atomic_clock_realtime().get_time_ns(),
5402            None,
5403        );
5404
5405        assert!(client.request_instruments(&request).is_ok());
5406
5407        // Should handle missing fields gracefully (may skip instruments or return empty)
5408        let timeout = tokio::time::Duration::from_secs(3);
5409        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5410            tokio::time::timeout(timeout, rx.recv()).await
5411        {
5412            // Parse errors should result in empty or partial response
5413            // The important part is no panic
5414            assert!(resp.correlation_id == request.request_id);
5415        }
5416    }
5417
5418    #[tokio::test]
5419    async fn test_invalid_data_types_in_response() {
5420        // Test handling when API returns wrong data types
5421        use axum::{Json, Router, routing::get};
5422        use serde_json::{Value, json};
5423
5424        #[derive(Clone)]
5425        struct InvalidTypesState;
5426
5427        async fn invalid_types_handler() -> Json<Value> {
5428            // Wrong data types - strings instead of numbers, etc.
5429            Json(json!({
5430                "markets": {
5431                    "BTC-USD": {
5432                        "ticker": "BTC-USD",
5433                        "status": "ACTIVE",
5434                        "baseAsset": "BTC",
5435                        "quoteAsset": "USD",
5436                        "stepSize": "not_a_number",  // Should be numeric
5437                        "tickSize": true,  // Should be numeric
5438                        "minOrderSize": ["array"],  // Should be numeric
5439                        "market": 12345,  // Should be string
5440                    }
5441                }
5442            }))
5443        }
5444
5445        let app = Router::new()
5446            .route("/v4/markets", get(invalid_types_handler))
5447            .with_state(InvalidTypesState);
5448
5449        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5450        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5451        let port = listener.local_addr().unwrap().port();
5452
5453        tokio::spawn(async move {
5454            axum::serve(listener, app).await.unwrap();
5455        });
5456
5457        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5458
5459        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5460        set_data_event_sender(sender);
5461
5462        let client_id = ClientId::from("DYDX-TYPES");
5463        let config = DydxDataClientConfig {
5464            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5465            http_timeout_secs: Some(2),
5466            ..Default::default()
5467        };
5468
5469        let http_client = DydxHttpClient::new(
5470            config.base_url_http.clone(),
5471            config.http_timeout_secs,
5472            config.http_proxy_url.clone(),
5473            config.is_testnet,
5474            None,
5475        )
5476        .unwrap();
5477
5478        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5479
5480        let request = RequestInstruments::new(
5481            None,
5482            None,
5483            Some(client_id),
5484            Some(*DYDX_VENUE),
5485            UUID4::new(),
5486            get_atomic_clock_realtime().get_time_ns(),
5487            None,
5488        );
5489
5490        assert!(client.request_instruments(&request).is_ok());
5491
5492        // Should handle type errors gracefully
5493        let timeout = tokio::time::Duration::from_secs(3);
5494        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5495            tokio::time::timeout(timeout, rx.recv()).await
5496        {
5497            // Type mismatch should result in parse failure and empty/partial response
5498            assert!(resp.correlation_id == request.request_id);
5499        }
5500    }
5501
5502    #[tokio::test]
5503    async fn test_unexpected_response_structure() {
5504        // Test handling when API response has completely unexpected structure
5505        use axum::{Json, Router, routing::get};
5506        use serde_json::{Value, json};
5507
5508        #[derive(Clone)]
5509        struct UnexpectedState;
5510
5511        async fn unexpected_structure_handler() -> Json<Value> {
5512            // Completely different structure than expected
5513            Json(json!({
5514                "error": "Something went wrong",
5515                "code": 500,
5516                "data": null,
5517                "unexpected_field": {
5518                    "nested": {
5519                        "deeply": [1, 2, 3]
5520                    }
5521                }
5522            }))
5523        }
5524
5525        let app = Router::new()
5526            .route("/v4/markets", get(unexpected_structure_handler))
5527            .with_state(UnexpectedState);
5528
5529        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5530        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5531        let port = listener.local_addr().unwrap().port();
5532
5533        tokio::spawn(async move {
5534            axum::serve(listener, app).await.unwrap();
5535        });
5536
5537        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5538
5539        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5540        set_data_event_sender(sender);
5541
5542        let client_id = ClientId::from("DYDX-STRUCT");
5543        let config = DydxDataClientConfig {
5544            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5545            http_timeout_secs: Some(2),
5546            ..Default::default()
5547        };
5548
5549        let http_client = DydxHttpClient::new(
5550            config.base_url_http.clone(),
5551            config.http_timeout_secs,
5552            config.http_proxy_url.clone(),
5553            config.is_testnet,
5554            None,
5555        )
5556        .unwrap();
5557
5558        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5559
5560        let request = RequestInstruments::new(
5561            None,
5562            None,
5563            Some(client_id),
5564            Some(*DYDX_VENUE),
5565            UUID4::new(),
5566            get_atomic_clock_realtime().get_time_ns(),
5567            None,
5568        );
5569
5570        assert!(client.request_instruments(&request).is_ok());
5571
5572        // Should handle unexpected structure gracefully with empty response
5573        let timeout = tokio::time::Duration::from_secs(3);
5574        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5575            tokio::time::timeout(timeout, rx.recv()).await
5576        {
5577            assert!(
5578                resp.data.is_empty(),
5579                "Expected empty response on unexpected structure"
5580            );
5581            assert!(resp.correlation_id == request.request_id);
5582        }
5583    }
5584
5585    #[tokio::test]
5586    async fn test_empty_markets_object_in_response() {
5587        // Test handling when markets object is empty (valid JSON but no data)
5588        use axum::{Json, Router, routing::get};
5589        use serde_json::{Value, json};
5590
5591        #[derive(Clone)]
5592        struct EmptyMarketsState;
5593
5594        async fn empty_markets_handler() -> Json<Value> {
5595            Json(json!({
5596                "markets": {}
5597            }))
5598        }
5599
5600        let app = Router::new()
5601            .route("/v4/markets", get(empty_markets_handler))
5602            .with_state(EmptyMarketsState);
5603
5604        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5605        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5606        let port = listener.local_addr().unwrap().port();
5607
5608        tokio::spawn(async move {
5609            axum::serve(listener, app).await.unwrap();
5610        });
5611
5612        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5613
5614        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5615        set_data_event_sender(sender);
5616
5617        let client_id = ClientId::from("DYDX-EMPTY");
5618        let config = DydxDataClientConfig {
5619            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5620            http_timeout_secs: Some(2),
5621            ..Default::default()
5622        };
5623
5624        let http_client = DydxHttpClient::new(
5625            config.base_url_http.clone(),
5626            config.http_timeout_secs,
5627            config.http_proxy_url.clone(),
5628            config.is_testnet,
5629            None,
5630        )
5631        .unwrap();
5632
5633        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5634
5635        let request = RequestInstruments::new(
5636            None,
5637            None,
5638            Some(client_id),
5639            Some(*DYDX_VENUE),
5640            UUID4::new(),
5641            get_atomic_clock_realtime().get_time_ns(),
5642            None,
5643        );
5644
5645        assert!(client.request_instruments(&request).is_ok());
5646
5647        // Should handle empty markets gracefully
5648        let timeout = tokio::time::Duration::from_secs(3);
5649        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5650            tokio::time::timeout(timeout, rx.recv()).await
5651        {
5652            assert!(
5653                resp.data.is_empty(),
5654                "Expected empty response for empty markets"
5655            );
5656            assert!(resp.correlation_id == request.request_id);
5657        }
5658    }
5659
5660    #[tokio::test]
5661    async fn test_null_values_in_response() {
5662        // Test handling of null values in critical fields
5663        use axum::{Json, Router, routing::get};
5664        use serde_json::{Value, json};
5665
5666        #[derive(Clone)]
5667        struct NullValuesState;
5668
5669        async fn null_values_handler() -> Json<Value> {
5670            Json(json!({
5671                "markets": {
5672                    "BTC-USD": {
5673                        "ticker": null,
5674                        "status": "ACTIVE",
5675                        "baseAsset": null,
5676                        "quoteAsset": "USD",
5677                        "stepSize": null,
5678                    }
5679                }
5680            }))
5681        }
5682
5683        let app = Router::new()
5684            .route("/v4/markets", get(null_values_handler))
5685            .with_state(NullValuesState);
5686
5687        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5688        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5689        let port = listener.local_addr().unwrap().port();
5690
5691        tokio::spawn(async move {
5692            axum::serve(listener, app).await.unwrap();
5693        });
5694
5695        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5696
5697        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5698        set_data_event_sender(sender);
5699
5700        let client_id = ClientId::from("DYDX-NULL");
5701        let config = DydxDataClientConfig {
5702            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5703            http_timeout_secs: Some(2),
5704            ..Default::default()
5705        };
5706
5707        let http_client = DydxHttpClient::new(
5708            config.base_url_http.clone(),
5709            config.http_timeout_secs,
5710            config.http_proxy_url.clone(),
5711            config.is_testnet,
5712            None,
5713        )
5714        .unwrap();
5715
5716        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5717
5718        let request = RequestInstruments::new(
5719            None,
5720            None,
5721            Some(client_id),
5722            Some(*DYDX_VENUE),
5723            UUID4::new(),
5724            get_atomic_clock_realtime().get_time_ns(),
5725            None,
5726        );
5727
5728        assert!(client.request_instruments(&request).is_ok());
5729
5730        // Should handle null values gracefully
5731        let timeout = tokio::time::Duration::from_secs(3);
5732        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5733            tokio::time::timeout(timeout, rx.recv()).await
5734        {
5735            // Null values should cause parse failures and result in empty/partial response
5736            assert!(resp.correlation_id == request.request_id);
5737        }
5738    }
5739
5740    // ========================================================================
5741    // Validation Error Tests
5742    // ========================================================================
5743
5744    #[tokio::test]
5745    async fn test_invalid_instrument_id_format() {
5746        // Test handling of non-existent instrument (valid ID format but doesn't exist)
5747        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5748        set_data_event_sender(sender);
5749
5750        let client_id = ClientId::from("DYDX-INVALID-ID");
5751        let config = DydxDataClientConfig::default();
5752
5753        let http_client = DydxHttpClient::new(
5754            config.base_url_http.clone(),
5755            config.http_timeout_secs,
5756            config.http_proxy_url.clone(),
5757            config.is_testnet,
5758            None,
5759        )
5760        .unwrap();
5761
5762        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5763
5764        // Valid format but non-existent instrument
5765        let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5766
5767        let request = RequestInstrument::new(
5768            non_existent_id,
5769            None,
5770            None,
5771            Some(client_id),
5772            UUID4::new(),
5773            get_atomic_clock_realtime().get_time_ns(),
5774            None,
5775        );
5776
5777        assert!(client.request_instrument(&request).is_ok());
5778
5779        // Should handle non-existent instrument gracefully
5780        let timeout = tokio::time::Duration::from_secs(2);
5781        let result = tokio::time::timeout(timeout, rx.recv()).await;
5782
5783        // Either no response or empty response is acceptable for non-existent instrument
5784        match result {
5785            Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5786                // Empty response acceptable
5787            }
5788            Ok(None) | Err(_) => {
5789                // Timeout or no response also acceptable
5790            }
5791            _ => {}
5792        }
5793    }
5794
5795    #[tokio::test]
5796    async fn test_invalid_date_range_end_before_start() {
5797        // Test handling when end date is before start date
5798        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5799        set_data_event_sender(sender);
5800
5801        let client_id = ClientId::from("DYDX-DATE-RANGE");
5802        let config = DydxDataClientConfig::default();
5803
5804        let http_client = DydxHttpClient::new(
5805            config.base_url_http.clone(),
5806            config.http_timeout_secs,
5807            config.http_proxy_url.clone(),
5808            config.is_testnet,
5809            None,
5810        )
5811        .unwrap();
5812
5813        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5814
5815        let instrument = create_test_instrument_any();
5816        let instrument_id = instrument.id();
5817        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5818        client.instruments.insert(symbol_key, instrument);
5819
5820        // Invalid date range: end is before start
5821        let start = chrono::Utc::now();
5822        let end = start - chrono::Duration::hours(24); // End is 24 hours before start
5823
5824        let request = RequestTrades::new(
5825            instrument_id,
5826            Some(start),
5827            Some(end),
5828            None,
5829            Some(client_id),
5830            UUID4::new(),
5831            get_atomic_clock_realtime().get_time_ns(),
5832            None,
5833        );
5834
5835        assert!(client.request_trades(&request).is_ok());
5836
5837        // Should handle invalid range gracefully - may return empty or no response
5838        let timeout = tokio::time::Duration::from_secs(2);
5839        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5840            tokio::time::timeout(timeout, rx.recv()).await
5841        {
5842            // Empty response expected for invalid date range
5843            assert!(resp.correlation_id == request.request_id);
5844        }
5845    }
5846
5847    #[tokio::test]
5848    async fn test_negative_limit_value() {
5849        // Test handling of limit edge cases
5850        // Note: Rust's NonZeroUsize prevents negative/zero values at type level
5851        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5852        set_data_event_sender(sender);
5853
5854        let client_id = ClientId::from("DYDX-NEG-LIMIT");
5855        let config = DydxDataClientConfig::default();
5856
5857        let http_client = DydxHttpClient::new(
5858            config.base_url_http.clone(),
5859            config.http_timeout_secs,
5860            config.http_proxy_url.clone(),
5861            config.is_testnet,
5862            None,
5863        )
5864        .unwrap();
5865
5866        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5867
5868        let instrument = create_test_instrument_any();
5869        let instrument_id = instrument.id();
5870        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5871        client.instruments.insert(symbol_key, instrument);
5872
5873        // Minimum valid limit (1)
5874        let request = RequestTrades::new(
5875            instrument_id,
5876            None,
5877            None,
5878            std::num::NonZeroUsize::new(1), // Minimum valid value
5879            Some(client_id),
5880            UUID4::new(),
5881            get_atomic_clock_realtime().get_time_ns(),
5882            None,
5883        );
5884
5885        // Should not panic with minimum limit
5886        assert!(client.request_trades(&request).is_ok());
5887    }
5888
5889    #[tokio::test]
5890    async fn test_zero_limit_value() {
5891        // Test handling of no limit (None = use API default)
5892        // Note: NonZeroUsize type prevents actual zero, so None represents "no limit"
5893        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5894        set_data_event_sender(sender);
5895
5896        let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5897        let config = DydxDataClientConfig::default();
5898
5899        let http_client = DydxHttpClient::new(
5900            config.base_url_http.clone(),
5901            config.http_timeout_secs,
5902            config.http_proxy_url.clone(),
5903            config.is_testnet,
5904            None,
5905        )
5906        .unwrap();
5907
5908        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5909
5910        let instrument = create_test_instrument_any();
5911        let instrument_id = instrument.id();
5912        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5913        client.instruments.insert(symbol_key, instrument);
5914
5915        let request = RequestTrades::new(
5916            instrument_id,
5917            None,
5918            None,
5919            None, // No limit specified (None = use default)
5920            Some(client_id),
5921            UUID4::new(),
5922            get_atomic_clock_realtime().get_time_ns(),
5923            None,
5924        );
5925
5926        assert!(client.request_trades(&request).is_ok());
5927
5928        // Should handle None limit gracefully (uses API default)
5929        let timeout = tokio::time::Duration::from_secs(2);
5930        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5931            tokio::time::timeout(timeout, rx.recv()).await
5932        {
5933            assert!(resp.correlation_id == request.request_id);
5934        }
5935    }
5936
5937    #[tokio::test]
5938    async fn test_very_large_limit_value() {
5939        // Test handling of extremely large limit values (boundary testing)
5940        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5941        set_data_event_sender(sender);
5942
5943        let client_id = ClientId::from("DYDX-LARGE-LIMIT");
5944        let config = DydxDataClientConfig::default();
5945
5946        let http_client = DydxHttpClient::new(
5947            config.base_url_http.clone(),
5948            config.http_timeout_secs,
5949            config.http_proxy_url.clone(),
5950            config.is_testnet,
5951            None,
5952        )
5953        .unwrap();
5954
5955        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5956
5957        let instrument = create_test_instrument_any();
5958        let instrument_id = instrument.id();
5959        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5960        client.instruments.insert(symbol_key, instrument);
5961
5962        let request = RequestTrades::new(
5963            instrument_id,
5964            None,
5965            None,
5966            std::num::NonZeroUsize::new(1_000_000), // Very large limit
5967            Some(client_id),
5968            UUID4::new(),
5969            get_atomic_clock_realtime().get_time_ns(),
5970            None,
5971        );
5972
5973        // Should not panic with very large limit
5974        assert!(client.request_trades(&request).is_ok());
5975
5976        // Should handle large limit gracefully
5977        let timeout = tokio::time::Duration::from_secs(2);
5978        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5979            tokio::time::timeout(timeout, rx.recv()).await
5980        {
5981            assert!(resp.correlation_id == request.request_id);
5982        }
5983    }
5984
5985    #[tokio::test]
5986    async fn test_none_limit_uses_default() {
5987        // Test that None limit falls back to default behavior
5988        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5989        set_data_event_sender(sender);
5990
5991        let client_id = ClientId::from("DYDX-NONE-LIMIT");
5992        let config = DydxDataClientConfig::default();
5993
5994        let http_client = DydxHttpClient::new(
5995            config.base_url_http.clone(),
5996            config.http_timeout_secs,
5997            config.http_proxy_url.clone(),
5998            config.is_testnet,
5999            None,
6000        )
6001        .unwrap();
6002
6003        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6004
6005        let instrument = create_test_instrument_any();
6006        let instrument_id = instrument.id();
6007        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6008        client.instruments.insert(symbol_key, instrument);
6009
6010        let request = RequestTrades::new(
6011            instrument_id,
6012            None,
6013            None,
6014            None, // No limit specified
6015            Some(client_id),
6016            UUID4::new(),
6017            get_atomic_clock_realtime().get_time_ns(),
6018            None,
6019        );
6020
6021        // Should work fine with None limit (uses API default)
6022        assert!(client.request_trades(&request).is_ok());
6023
6024        let timeout = tokio::time::Duration::from_secs(2);
6025        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6026            tokio::time::timeout(timeout, rx.recv()).await
6027        {
6028            assert!(resp.correlation_id == request.request_id);
6029        }
6030    }
6031
6032    #[tokio::test]
6033    async fn test_validation_does_not_panic() {
6034        // Test that various validation edge cases don't cause panics
6035        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6036        set_data_event_sender(sender);
6037
6038        let client_id = ClientId::from("DYDX-VALIDATION");
6039        let config = DydxDataClientConfig::default();
6040
6041        let http_client = DydxHttpClient::new(
6042            config.base_url_http.clone(),
6043            config.http_timeout_secs,
6044            config.http_proxy_url.clone(),
6045            config.is_testnet,
6046            None,
6047        )
6048        .unwrap();
6049
6050        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6051
6052        let instrument = create_test_instrument_any();
6053        let instrument_id = instrument.id();
6054        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6055        client.instruments.insert(symbol_key, instrument);
6056
6057        // Test 1: Invalid instrument ID
6058        let invalid_id = InstrumentId::from("INVALID.WRONG");
6059        let req1 = RequestInstrument::new(
6060            invalid_id,
6061            None,
6062            None,
6063            Some(client_id),
6064            UUID4::new(),
6065            get_atomic_clock_realtime().get_time_ns(),
6066            None,
6067        );
6068        assert!(client.request_instrument(&req1).is_ok());
6069
6070        // Test 2: Invalid date range
6071        let start = chrono::Utc::now();
6072        let end = start - chrono::Duration::hours(1);
6073        let req2 = RequestTrades::new(
6074            instrument_id,
6075            Some(start),
6076            Some(end),
6077            None,
6078            Some(client_id),
6079            UUID4::new(),
6080            get_atomic_clock_realtime().get_time_ns(),
6081            None,
6082        );
6083        assert!(client.request_trades(&req2).is_ok());
6084
6085        // Test 3: Minimum limit (1)
6086        let req3 = RequestTrades::new(
6087            instrument_id,
6088            None,
6089            None,
6090            std::num::NonZeroUsize::new(1),
6091            Some(client_id),
6092            UUID4::new(),
6093            get_atomic_clock_realtime().get_time_ns(),
6094            None,
6095        );
6096        assert!(client.request_trades(&req3).is_ok());
6097
6098        // Test 4: Very large limit
6099        let req4 = RequestTrades::new(
6100            instrument_id,
6101            None,
6102            None,
6103            std::num::NonZeroUsize::new(usize::MAX),
6104            Some(client_id),
6105            UUID4::new(),
6106            get_atomic_clock_realtime().get_time_ns(),
6107            None,
6108        );
6109        assert!(client.request_trades(&req4).is_ok());
6110
6111        // All validation edge cases handled without panic
6112    }
6113
6114    // ========================================================================
6115    // Response Format Verification Tests - InstrumentsResponse
6116    // ========================================================================
6117
6118    #[tokio::test]
6119    async fn test_instruments_response_has_correct_venue() {
6120        // Verify InstrumentsResponse includes correct DYDX venue
6121        use axum::{Json, Router, routing::get};
6122        use serde_json::{Value, json};
6123
6124        #[derive(Clone)]
6125        struct VenueTestState;
6126
6127        async fn venue_handler() -> Json<Value> {
6128            Json(json!({
6129                "markets": {
6130                    "BTC-USD": {
6131                        "ticker": "BTC-USD",
6132                        "status": "ACTIVE",
6133                        "baseAsset": "BTC",
6134                        "quoteAsset": "USD",
6135                        "stepSize": "0.0001",
6136                        "tickSize": "1",
6137                        "indexPrice": "50000",
6138                        "oraclePrice": "50000",
6139                        "priceChange24H": "1000",
6140                        "nextFundingRate": "0.0001",
6141                        "nextFundingAt": "2024-01-01T00:00:00.000Z",
6142                        "minOrderSize": "0.001",
6143                        "type": "PERPETUAL",
6144                        "initialMarginFraction": "0.05",
6145                        "maintenanceMarginFraction": "0.03",
6146                        "volume24H": "1000000",
6147                        "trades24H": "10000",
6148                        "openInterest": "5000000",
6149                        "incrementalInitialMarginFraction": "0.01",
6150                        "incrementalPositionSize": "10",
6151                        "maxPositionSize": "1000",
6152                        "baselinePositionSize": "100",
6153                        "assetResolution": "10000000000"
6154                    }
6155                }
6156            }))
6157        }
6158
6159        let app = Router::new()
6160            .route("/v4/markets", get(venue_handler))
6161            .with_state(VenueTestState);
6162
6163        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
6164        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
6165        let port = listener.local_addr().unwrap().port();
6166
6167        tokio::spawn(async move {
6168            axum::serve(listener, app).await.unwrap();
6169        });
6170
6171        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
6172
6173        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6174        set_data_event_sender(sender);
6175
6176        let client_id = ClientId::from("DYDX-VENUE-TEST");
6177        let config = DydxDataClientConfig {
6178            base_url_http: Some(format!("http://127.0.0.1:{}", port)),
6179            http_timeout_secs: Some(2),
6180            ..Default::default()
6181        };
6182
6183        let http_client = DydxHttpClient::new(
6184            config.base_url_http.clone(),
6185            config.http_timeout_secs,
6186            config.http_proxy_url.clone(),
6187            config.is_testnet,
6188            None,
6189        )
6190        .unwrap();
6191
6192        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6193
6194        let request = RequestInstruments::new(
6195            None,
6196            None,
6197            Some(client_id),
6198            Some(*DYDX_VENUE),
6199            UUID4::new(),
6200            get_atomic_clock_realtime().get_time_ns(),
6201            None,
6202        );
6203
6204        assert!(client.request_instruments(&request).is_ok());
6205
6206        let timeout = tokio::time::Duration::from_secs(3);
6207        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6208            tokio::time::timeout(timeout, rx.recv()).await
6209        {
6210            // Verify venue is DYDX
6211            assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6212        }
6213    }
6214
6215    #[tokio::test]
6216    async fn test_instruments_response_contains_vec_instrument_any() {
6217        // Verify InstrumentsResponse contains Vec<InstrumentAny>
6218        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6219        set_data_event_sender(sender);
6220
6221        let client_id = ClientId::from("DYDX-VEC-TEST");
6222        let config = DydxDataClientConfig::default();
6223
6224        let http_client = DydxHttpClient::new(
6225            config.base_url_http.clone(),
6226            config.http_timeout_secs,
6227            config.http_proxy_url.clone(),
6228            config.is_testnet,
6229            None,
6230        )
6231        .unwrap();
6232
6233        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6234
6235        let request = RequestInstruments::new(
6236            None,
6237            None,
6238            Some(client_id),
6239            Some(*DYDX_VENUE),
6240            UUID4::new(),
6241            get_atomic_clock_realtime().get_time_ns(),
6242            None,
6243        );
6244
6245        assert!(client.request_instruments(&request).is_ok());
6246
6247        let timeout = tokio::time::Duration::from_secs(2);
6248        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6249            tokio::time::timeout(timeout, rx.recv()).await
6250        {
6251            // Verify data is Vec<InstrumentAny>
6252            assert!(
6253                resp.data.is_empty() || !resp.data.is_empty(),
6254                "data should be Vec<InstrumentAny>"
6255            );
6256        }
6257    }
6258
6259    #[tokio::test]
6260    async fn test_instruments_response_includes_correlation_id() {
6261        // Verify InstrumentsResponse includes correlation_id matching request_id
6262        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6263        set_data_event_sender(sender);
6264
6265        let client_id = ClientId::from("DYDX-CORR-TEST");
6266        let config = DydxDataClientConfig::default();
6267
6268        let http_client = DydxHttpClient::new(
6269            config.base_url_http.clone(),
6270            config.http_timeout_secs,
6271            config.http_proxy_url.clone(),
6272            config.is_testnet,
6273            None,
6274        )
6275        .unwrap();
6276
6277        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6278
6279        let request_id = UUID4::new();
6280        let request = RequestInstruments::new(
6281            None,
6282            None,
6283            Some(client_id),
6284            Some(*DYDX_VENUE),
6285            request_id,
6286            get_atomic_clock_realtime().get_time_ns(),
6287            None,
6288        );
6289
6290        assert!(client.request_instruments(&request).is_ok());
6291
6292        let timeout = tokio::time::Duration::from_secs(2);
6293        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6294            tokio::time::timeout(timeout, rx.recv()).await
6295        {
6296            // Verify correlation_id matches request_id
6297            assert_eq!(
6298                resp.correlation_id, request_id,
6299                "correlation_id should match request_id"
6300            );
6301        }
6302    }
6303
6304    #[tokio::test]
6305    async fn test_instruments_response_includes_client_id() {
6306        // Verify InstrumentsResponse includes client_id
6307        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6308        set_data_event_sender(sender);
6309
6310        let client_id = ClientId::from("DYDX-CLIENT-TEST");
6311        let config = DydxDataClientConfig::default();
6312
6313        let http_client = DydxHttpClient::new(
6314            config.base_url_http.clone(),
6315            config.http_timeout_secs,
6316            config.http_proxy_url.clone(),
6317            config.is_testnet,
6318            None,
6319        )
6320        .unwrap();
6321
6322        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6323
6324        let request = RequestInstruments::new(
6325            None,
6326            None,
6327            Some(client_id),
6328            Some(*DYDX_VENUE),
6329            UUID4::new(),
6330            get_atomic_clock_realtime().get_time_ns(),
6331            None,
6332        );
6333
6334        assert!(client.request_instruments(&request).is_ok());
6335
6336        let timeout = tokio::time::Duration::from_secs(2);
6337        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6338            tokio::time::timeout(timeout, rx.recv()).await
6339        {
6340            // Verify client_id is included
6341            assert_eq!(
6342                resp.client_id, client_id,
6343                "client_id should be included in response"
6344            );
6345        }
6346    }
6347
6348    #[tokio::test]
6349    async fn test_instruments_response_includes_timestamps() {
6350        // Verify InstrumentsResponse includes start, end, and ts_init timestamps
6351        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6352        set_data_event_sender(sender);
6353
6354        let client_id = ClientId::from("DYDX-TS-TEST");
6355        let config = DydxDataClientConfig::default();
6356
6357        let http_client = DydxHttpClient::new(
6358            config.base_url_http.clone(),
6359            config.http_timeout_secs,
6360            config.http_proxy_url.clone(),
6361            config.is_testnet,
6362            None,
6363        )
6364        .unwrap();
6365
6366        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6367
6368        let start = Some(chrono::Utc::now() - chrono::Duration::days(1));
6369        let end = Some(chrono::Utc::now());
6370        let ts_init = get_atomic_clock_realtime().get_time_ns();
6371
6372        let request = RequestInstruments::new(
6373            start,
6374            end,
6375            Some(client_id),
6376            Some(*DYDX_VENUE),
6377            UUID4::new(),
6378            ts_init,
6379            None,
6380        );
6381
6382        assert!(client.request_instruments(&request).is_ok());
6383
6384        let timeout = tokio::time::Duration::from_secs(2);
6385        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6386            tokio::time::timeout(timeout, rx.recv()).await
6387        {
6388            // Verify timestamps are included
6389            assert!(
6390                resp.start.is_some() || resp.start.is_none(),
6391                "start timestamp field exists"
6392            );
6393            assert!(
6394                resp.end.is_some() || resp.end.is_none(),
6395                "end timestamp field exists"
6396            );
6397            assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6398        }
6399    }
6400
6401    #[tokio::test]
6402    async fn test_instruments_response_includes_params_when_provided() {
6403        // Verify InstrumentsResponse includes params when provided in request
6404        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6405        set_data_event_sender(sender);
6406
6407        let client_id = ClientId::from("DYDX-PARAMS-TEST");
6408        let config = DydxDataClientConfig::default();
6409
6410        let http_client = DydxHttpClient::new(
6411            config.base_url_http.clone(),
6412            config.http_timeout_secs,
6413            config.http_proxy_url.clone(),
6414            config.is_testnet,
6415            None,
6416        )
6417        .unwrap();
6418
6419        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6420
6421        // Since we can't easily create IndexMap in tests without importing,
6422        // just verify the params field exists by passing None
6423        let request = RequestInstruments::new(
6424            None,
6425            None,
6426            Some(client_id),
6427            Some(*DYDX_VENUE),
6428            UUID4::new(),
6429            get_atomic_clock_realtime().get_time_ns(),
6430            None, // params
6431        );
6432
6433        assert!(client.request_instruments(&request).is_ok());
6434
6435        let timeout = tokio::time::Duration::from_secs(2);
6436        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6437            tokio::time::timeout(timeout, rx.recv()).await
6438        {
6439            // Verify params field exists (structure validation)
6440            let _params = resp.params;
6441        }
6442    }
6443
6444    #[tokio::test]
6445    async fn test_instruments_response_params_none_when_not_provided() {
6446        // Verify InstrumentsResponse params is None when not provided
6447        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6448        set_data_event_sender(sender);
6449
6450        let client_id = ClientId::from("DYDX-NO-PARAMS");
6451        let config = DydxDataClientConfig::default();
6452
6453        let http_client = DydxHttpClient::new(
6454            config.base_url_http.clone(),
6455            config.http_timeout_secs,
6456            config.http_proxy_url.clone(),
6457            config.is_testnet,
6458            None,
6459        )
6460        .unwrap();
6461
6462        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6463
6464        let request = RequestInstruments::new(
6465            None,
6466            None,
6467            Some(client_id),
6468            Some(*DYDX_VENUE),
6469            UUID4::new(),
6470            get_atomic_clock_realtime().get_time_ns(),
6471            None, // No params
6472        );
6473
6474        assert!(client.request_instruments(&request).is_ok());
6475
6476        let timeout = tokio::time::Duration::from_secs(2);
6477        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6478            tokio::time::timeout(timeout, rx.recv()).await
6479        {
6480            // Verify params field exists and is None when not provided
6481            assert!(
6482                resp.params.is_none(),
6483                "params should be None when not provided"
6484            );
6485        }
6486    }
6487
6488    #[tokio::test]
6489    async fn test_instruments_response_complete_structure() {
6490        // Comprehensive test verifying all InstrumentsResponse fields
6491        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6492        set_data_event_sender(sender);
6493
6494        let client_id = ClientId::from("DYDX-FULL-TEST");
6495        let config = DydxDataClientConfig::default();
6496
6497        let http_client = DydxHttpClient::new(
6498            config.base_url_http.clone(),
6499            config.http_timeout_secs,
6500            config.http_proxy_url.clone(),
6501            config.is_testnet,
6502            None,
6503        )
6504        .unwrap();
6505
6506        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6507
6508        let request_id = UUID4::new();
6509        let start = Some(chrono::Utc::now() - chrono::Duration::hours(1));
6510        let end = Some(chrono::Utc::now());
6511        let ts_init = get_atomic_clock_realtime().get_time_ns();
6512
6513        let request = RequestInstruments::new(
6514            start,
6515            end,
6516            Some(client_id),
6517            Some(*DYDX_VENUE),
6518            request_id,
6519            ts_init,
6520            None,
6521        );
6522
6523        assert!(client.request_instruments(&request).is_ok());
6524
6525        let timeout = tokio::time::Duration::from_secs(2);
6526        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6527            tokio::time::timeout(timeout, rx.recv()).await
6528        {
6529            // Comprehensive validation of all fields
6530            assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6531            assert_eq!(
6532                resp.correlation_id, request_id,
6533                "correlation_id should match"
6534            );
6535            assert_eq!(resp.client_id, client_id, "client_id should match");
6536            assert!(resp.ts_init > 0, "ts_init should be set");
6537
6538            // data field exists (Vec<InstrumentAny>)
6539            let _data: Vec<InstrumentAny> = resp.data;
6540
6541            // Timestamp fields can be present or None
6542            let _start = resp.start;
6543            let _end = resp.end;
6544            let _params = resp.params;
6545        }
6546    }
6547
6548    // ========================================================================
6549    // Response Format Verification Tests - InstrumentResponse
6550    // ========================================================================
6551
6552    #[tokio::test]
6553    async fn test_instrument_response_properly_boxed() {
6554        // Verify InstrumentResponse is properly boxed in DataResponse
6555        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6556        set_data_event_sender(sender);
6557
6558        let client_id = ClientId::from("DYDX-BOXED-TEST");
6559        let config = DydxDataClientConfig::default();
6560
6561        let http_client = DydxHttpClient::new(
6562            config.base_url_http.clone(),
6563            config.http_timeout_secs,
6564            config.http_proxy_url.clone(),
6565            config.is_testnet,
6566            None,
6567        )
6568        .unwrap();
6569
6570        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6571
6572        let instrument = create_test_instrument_any();
6573        let instrument_id = instrument.id();
6574        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6575        client.instruments.insert(symbol_key, instrument);
6576
6577        let request = RequestInstrument::new(
6578            instrument_id,
6579            None,
6580            None,
6581            Some(client_id),
6582            UUID4::new(),
6583            get_atomic_clock_realtime().get_time_ns(),
6584            None,
6585        );
6586
6587        assert!(client.request_instrument(&request).is_ok());
6588
6589        let timeout = tokio::time::Duration::from_secs(2);
6590        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6591            tokio::time::timeout(timeout, rx.recv()).await
6592        {
6593            // Verify it's boxed - we receive Box<InstrumentResponse>
6594            let _response: Box<InstrumentResponse> = boxed_resp;
6595            // Successfully matched boxed pattern
6596        }
6597    }
6598
6599    #[tokio::test]
6600    async fn test_instrument_response_contains_single_instrument() {
6601        // Verify InstrumentResponse contains single InstrumentAny
6602        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6603        set_data_event_sender(sender);
6604
6605        let client_id = ClientId::from("DYDX-SINGLE-TEST");
6606        let config = DydxDataClientConfig::default();
6607
6608        let http_client = DydxHttpClient::new(
6609            config.base_url_http.clone(),
6610            config.http_timeout_secs,
6611            config.http_proxy_url.clone(),
6612            config.is_testnet,
6613            None,
6614        )
6615        .unwrap();
6616
6617        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6618
6619        let instrument = create_test_instrument_any();
6620        let instrument_id = instrument.id();
6621        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6622        client.instruments.insert(symbol_key, instrument.clone());
6623
6624        let request = RequestInstrument::new(
6625            instrument_id,
6626            None,
6627            None,
6628            Some(client_id),
6629            UUID4::new(),
6630            get_atomic_clock_realtime().get_time_ns(),
6631            None,
6632        );
6633
6634        assert!(client.request_instrument(&request).is_ok());
6635
6636        let timeout = tokio::time::Duration::from_secs(2);
6637        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6638            tokio::time::timeout(timeout, rx.recv()).await
6639        {
6640            // Verify data contains single InstrumentAny
6641            let _instrument: InstrumentAny = resp.data;
6642            // Successfully matched InstrumentAny type
6643        }
6644    }
6645
6646    #[tokio::test]
6647    async fn test_instrument_response_has_correct_instrument_id() {
6648        // Verify InstrumentResponse has correct instrument_id
6649        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6650        set_data_event_sender(sender);
6651
6652        let client_id = ClientId::from("DYDX-ID-TEST");
6653        let config = DydxDataClientConfig::default();
6654
6655        let http_client = DydxHttpClient::new(
6656            config.base_url_http.clone(),
6657            config.http_timeout_secs,
6658            config.http_proxy_url.clone(),
6659            config.is_testnet,
6660            None,
6661        )
6662        .unwrap();
6663
6664        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6665
6666        let instrument = create_test_instrument_any();
6667        let instrument_id = instrument.id();
6668        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6669        client.instruments.insert(symbol_key, instrument);
6670
6671        let request = RequestInstrument::new(
6672            instrument_id,
6673            None,
6674            None,
6675            Some(client_id),
6676            UUID4::new(),
6677            get_atomic_clock_realtime().get_time_ns(),
6678            None,
6679        );
6680
6681        assert!(client.request_instrument(&request).is_ok());
6682
6683        let timeout = tokio::time::Duration::from_secs(2);
6684        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6685            tokio::time::timeout(timeout, rx.recv()).await
6686        {
6687            // Verify instrument_id matches request
6688            assert_eq!(
6689                resp.instrument_id, instrument_id,
6690                "instrument_id should match requested ID"
6691            );
6692        }
6693    }
6694
6695    #[tokio::test]
6696    async fn test_instrument_response_includes_metadata() {
6697        // Verify InstrumentResponse includes all metadata fields
6698        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6699        set_data_event_sender(sender);
6700
6701        let client_id = ClientId::from("DYDX-META-TEST");
6702        let config = DydxDataClientConfig::default();
6703
6704        let http_client = DydxHttpClient::new(
6705            config.base_url_http.clone(),
6706            config.http_timeout_secs,
6707            config.http_proxy_url.clone(),
6708            config.is_testnet,
6709            None,
6710        )
6711        .unwrap();
6712
6713        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6714
6715        let instrument = create_test_instrument_any();
6716        let instrument_id = instrument.id();
6717        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6718        client.instruments.insert(symbol_key, instrument);
6719
6720        let request_id = UUID4::new();
6721        let start = Some(chrono::Utc::now() - chrono::Duration::hours(1));
6722        let end = Some(chrono::Utc::now());
6723        let ts_init = get_atomic_clock_realtime().get_time_ns();
6724
6725        let request = RequestInstrument::new(
6726            instrument_id,
6727            start,
6728            end,
6729            Some(client_id),
6730            request_id,
6731            ts_init,
6732            None,
6733        );
6734
6735        assert!(client.request_instrument(&request).is_ok());
6736
6737        let timeout = tokio::time::Duration::from_secs(2);
6738        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6739            tokio::time::timeout(timeout, rx.recv()).await
6740        {
6741            // Verify all metadata fields
6742            assert_eq!(
6743                resp.correlation_id, request_id,
6744                "correlation_id should match"
6745            );
6746            assert_eq!(resp.client_id, client_id, "client_id should match");
6747            assert!(resp.ts_init > 0, "ts_init should be set");
6748
6749            // Timestamp fields exist (can be Some or None)
6750            let _start = resp.start;
6751            let _end = resp.end;
6752
6753            // Params field exists
6754            let _params = resp.params;
6755        }
6756    }
6757
6758    #[tokio::test]
6759    async fn test_instrument_response_matches_requested_instrument() {
6760        // Verify InstrumentResponse data matches the requested instrument exactly
6761        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6762        set_data_event_sender(sender);
6763
6764        let client_id = ClientId::from("DYDX-MATCH-TEST");
6765        let config = DydxDataClientConfig::default();
6766
6767        let http_client = DydxHttpClient::new(
6768            config.base_url_http.clone(),
6769            config.http_timeout_secs,
6770            config.http_proxy_url.clone(),
6771            config.is_testnet,
6772            None,
6773        )
6774        .unwrap();
6775
6776        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6777
6778        let instrument = create_test_instrument_any();
6779        let instrument_id = instrument.id();
6780        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6781        client.instruments.insert(symbol_key, instrument.clone());
6782
6783        let request = RequestInstrument::new(
6784            instrument_id,
6785            None,
6786            None,
6787            Some(client_id),
6788            UUID4::new(),
6789            get_atomic_clock_realtime().get_time_ns(),
6790            None,
6791        );
6792
6793        assert!(client.request_instrument(&request).is_ok());
6794
6795        let timeout = tokio::time::Duration::from_secs(2);
6796        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6797            tokio::time::timeout(timeout, rx.recv()).await
6798        {
6799            // Verify returned instrument matches requested instrument
6800            assert_eq!(
6801                resp.data.id(),
6802                instrument_id,
6803                "Returned instrument should match requested"
6804            );
6805            assert_eq!(
6806                resp.instrument_id, instrument_id,
6807                "instrument_id field should match"
6808            );
6809
6810            // Both should point to the same instrument
6811            assert_eq!(
6812                resp.data.id(),
6813                resp.instrument_id,
6814                "data.id() should match instrument_id field"
6815            );
6816        }
6817    }
6818
6819    #[tokio::test]
6820    async fn test_instrument_response_complete_structure() {
6821        // Comprehensive test verifying all InstrumentResponse fields
6822        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6823        set_data_event_sender(sender);
6824
6825        let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6826        let config = DydxDataClientConfig::default();
6827
6828        let http_client = DydxHttpClient::new(
6829            config.base_url_http.clone(),
6830            config.http_timeout_secs,
6831            config.http_proxy_url.clone(),
6832            config.is_testnet,
6833            None,
6834        )
6835        .unwrap();
6836
6837        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6838
6839        let instrument = create_test_instrument_any();
6840        let instrument_id = instrument.id();
6841        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6842        client.instruments.insert(symbol_key, instrument.clone());
6843
6844        let request_id = UUID4::new();
6845        let ts_init = get_atomic_clock_realtime().get_time_ns();
6846
6847        let request = RequestInstrument::new(
6848            instrument_id,
6849            None,
6850            None,
6851            Some(client_id),
6852            request_id,
6853            ts_init,
6854            None,
6855        );
6856
6857        assert!(client.request_instrument(&request).is_ok());
6858
6859        let timeout = tokio::time::Duration::from_secs(2);
6860        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6861            tokio::time::timeout(timeout, rx.recv()).await
6862        {
6863            // Comprehensive validation
6864            // 1. Boxed structure
6865            let _boxed: Box<InstrumentResponse> = resp.clone();
6866
6867            // 2. All required fields present
6868            assert_eq!(resp.correlation_id, request_id);
6869            assert_eq!(resp.client_id, client_id);
6870            assert_eq!(resp.instrument_id, instrument_id);
6871            assert!(resp.ts_init > 0);
6872
6873            // 3. Data field contains InstrumentAny
6874            let returned_instrument: InstrumentAny = resp.data;
6875            assert_eq!(returned_instrument.id(), instrument_id);
6876
6877            // 4. Optional fields exist
6878            let _start = resp.start;
6879            let _end = resp.end;
6880            let _params = resp.params;
6881        }
6882    }
6883
6884    // ========================================================================
6885    // TradesResponse Format Verification Tests
6886    // ========================================================================
6887
6888    #[tokio::test]
6889    async fn test_trades_response_contains_vec_trade_tick() {
6890        // Verify TradesResponse.data is Vec<TradeTick>
6891        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6892        set_data_event_sender(sender);
6893
6894        let created_at = chrono::Utc::now();
6895        let http_trades = vec![
6896            crate::http::models::Trade {
6897                id: "trade-1".to_string(),
6898                side: OrderSide::Buy,
6899                size: dec!(1.0),
6900                price: dec!(100.0),
6901                created_at,
6902                created_at_height: 100,
6903                trade_type: crate::common::enums::DydxTradeType::Limit,
6904            },
6905            crate::http::models::Trade {
6906                id: "trade-2".to_string(),
6907                side: OrderSide::Sell,
6908                size: dec!(2.0),
6909                price: dec!(101.0),
6910                created_at: created_at + chrono::Duration::seconds(1),
6911                created_at_height: 101,
6912                trade_type: crate::common::enums::DydxTradeType::Limit,
6913            },
6914        ];
6915
6916        let trades_response = crate::http::models::TradesResponse {
6917            trades: http_trades,
6918        };
6919
6920        let state = TradesTestState {
6921            response: Arc::new(trades_response),
6922            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6923            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6924        };
6925
6926        let addr = start_trades_test_server(state).await;
6927        let base_url = format!("http://{}", addr);
6928
6929        let client_id = ClientId::from("DYDX-VEC-TEST");
6930        let config = DydxDataClientConfig {
6931            base_url_http: Some(base_url),
6932            is_testnet: true,
6933            ..Default::default()
6934        };
6935
6936        let http_client = DydxHttpClient::new(
6937            config.base_url_http.clone(),
6938            config.http_timeout_secs,
6939            config.http_proxy_url.clone(),
6940            config.is_testnet,
6941            None,
6942        )
6943        .unwrap();
6944
6945        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6946
6947        let instrument = create_test_instrument_any();
6948        let instrument_id = instrument.id();
6949        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6950        client.instruments.insert(symbol_key, instrument);
6951
6952        let request = RequestTrades::new(
6953            instrument_id,
6954            None,
6955            None,
6956            None,
6957            Some(client_id),
6958            UUID4::new(),
6959            get_atomic_clock_realtime().get_time_ns(),
6960            None,
6961        );
6962
6963        assert!(client.request_trades(&request).is_ok());
6964
6965        let timeout = tokio::time::Duration::from_millis(500);
6966        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6967            tokio::time::timeout(timeout, rx.recv()).await
6968        {
6969            // Verify data is Vec<TradeTick>
6970            let trade_ticks: Vec<TradeTick> = resp.data;
6971            assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
6972
6973            // Each element is a TradeTick
6974            for tick in trade_ticks.iter() {
6975                assert_eq!(tick.instrument_id, instrument_id);
6976            }
6977        }
6978    }
6979
6980    #[tokio::test]
6981    async fn test_trades_response_has_correct_instrument_id() {
6982        // Verify TradesResponse.instrument_id matches request
6983        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6984        set_data_event_sender(sender);
6985
6986        let created_at = chrono::Utc::now();
6987        let http_trade = crate::http::models::Trade {
6988            id: "instrument-id-test".to_string(),
6989            side: OrderSide::Buy,
6990            size: dec!(1.0),
6991            price: dec!(100.0),
6992            created_at,
6993            created_at_height: 100,
6994            trade_type: crate::common::enums::DydxTradeType::Limit,
6995        };
6996
6997        let trades_response = crate::http::models::TradesResponse {
6998            trades: vec![http_trade],
6999        };
7000
7001        let state = TradesTestState {
7002            response: Arc::new(trades_response),
7003            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7004            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7005        };
7006
7007        let addr = start_trades_test_server(state).await;
7008        let base_url = format!("http://{}", addr);
7009
7010        let client_id = ClientId::from("DYDX-INSTID-TEST");
7011        let config = DydxDataClientConfig {
7012            base_url_http: Some(base_url),
7013            is_testnet: true,
7014            ..Default::default()
7015        };
7016
7017        let http_client = DydxHttpClient::new(
7018            config.base_url_http.clone(),
7019            config.http_timeout_secs,
7020            config.http_proxy_url.clone(),
7021            config.is_testnet,
7022            None,
7023        )
7024        .unwrap();
7025
7026        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7027
7028        let instrument = create_test_instrument_any();
7029        let instrument_id = instrument.id();
7030        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7031        client.instruments.insert(symbol_key, instrument);
7032
7033        let request = RequestTrades::new(
7034            instrument_id,
7035            None,
7036            None,
7037            None,
7038            Some(client_id),
7039            UUID4::new(),
7040            get_atomic_clock_realtime().get_time_ns(),
7041            None,
7042        );
7043
7044        assert!(client.request_trades(&request).is_ok());
7045
7046        let timeout = tokio::time::Duration::from_millis(500);
7047        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7048            tokio::time::timeout(timeout, rx.recv()).await
7049        {
7050            // Verify instrument_id field matches request
7051            assert_eq!(
7052                resp.instrument_id, instrument_id,
7053                "TradesResponse.instrument_id should match request"
7054            );
7055
7056            // Verify all trade ticks have the same instrument_id
7057            for tick in resp.data.iter() {
7058                assert_eq!(
7059                    tick.instrument_id, instrument_id,
7060                    "Each TradeTick should have correct instrument_id"
7061                );
7062            }
7063        }
7064    }
7065
7066    #[tokio::test]
7067    async fn test_trades_response_properly_ordered_by_timestamp() {
7068        // Verify trades are ordered by timestamp (ascending)
7069        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7070        set_data_event_sender(sender);
7071
7072        let base_time = chrono::Utc::now();
7073        let http_trades = vec![
7074            crate::http::models::Trade {
7075                id: "trade-oldest".to_string(),
7076                side: OrderSide::Buy,
7077                size: dec!(1.0),
7078                price: dec!(100.0),
7079                created_at: base_time,
7080                created_at_height: 100,
7081                trade_type: crate::common::enums::DydxTradeType::Limit,
7082            },
7083            crate::http::models::Trade {
7084                id: "trade-middle".to_string(),
7085                side: OrderSide::Sell,
7086                size: dec!(2.0),
7087                price: dec!(101.0),
7088                created_at: base_time + chrono::Duration::seconds(1),
7089                created_at_height: 101,
7090                trade_type: crate::common::enums::DydxTradeType::Limit,
7091            },
7092            crate::http::models::Trade {
7093                id: "trade-newest".to_string(),
7094                side: OrderSide::Buy,
7095                size: dec!(3.0),
7096                price: dec!(102.0),
7097                created_at: base_time + chrono::Duration::seconds(2),
7098                created_at_height: 102,
7099                trade_type: crate::common::enums::DydxTradeType::Limit,
7100            },
7101        ];
7102
7103        let trades_response = crate::http::models::TradesResponse {
7104            trades: http_trades,
7105        };
7106
7107        let state = TradesTestState {
7108            response: Arc::new(trades_response),
7109            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7110            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7111        };
7112
7113        let addr = start_trades_test_server(state).await;
7114        let base_url = format!("http://{}", addr);
7115
7116        let client_id = ClientId::from("DYDX-ORDER-TEST");
7117        let config = DydxDataClientConfig {
7118            base_url_http: Some(base_url),
7119            is_testnet: true,
7120            ..Default::default()
7121        };
7122
7123        let http_client = DydxHttpClient::new(
7124            config.base_url_http.clone(),
7125            config.http_timeout_secs,
7126            config.http_proxy_url.clone(),
7127            config.is_testnet,
7128            None,
7129        )
7130        .unwrap();
7131
7132        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7133
7134        let instrument = create_test_instrument_any();
7135        let instrument_id = instrument.id();
7136        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7137        client.instruments.insert(symbol_key, instrument);
7138
7139        let request = RequestTrades::new(
7140            instrument_id,
7141            None,
7142            None,
7143            None,
7144            Some(client_id),
7145            UUID4::new(),
7146            get_atomic_clock_realtime().get_time_ns(),
7147            None,
7148        );
7149
7150        assert!(client.request_trades(&request).is_ok());
7151
7152        let timeout = tokio::time::Duration::from_millis(500);
7153        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7154            tokio::time::timeout(timeout, rx.recv()).await
7155        {
7156            // Verify trades are ordered by timestamp
7157            let trade_ticks = resp.data;
7158            assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7159
7160            // Check ascending timestamp order
7161            for i in 1..trade_ticks.len() {
7162                assert!(
7163                    trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7164                    "Trades should be ordered by timestamp (ts_event) in ascending order"
7165                );
7166            }
7167
7168            // Verify specific ordering
7169            assert!(
7170                trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7171                "First trade should be before second"
7172            );
7173            assert!(
7174                trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7175                "Second trade should be before third"
7176            );
7177        }
7178    }
7179
7180    #[tokio::test]
7181    async fn test_trades_response_all_trade_tick_fields_populated() {
7182        // Verify all TradeTick fields are properly populated
7183        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7184        set_data_event_sender(sender);
7185
7186        let created_at = chrono::Utc::now();
7187        let http_trade = crate::http::models::Trade {
7188            id: "field-test".to_string(),
7189            side: OrderSide::Buy,
7190            size: dec!(5.5),
7191            price: dec!(12345.67),
7192            created_at,
7193            created_at_height: 999,
7194            trade_type: crate::common::enums::DydxTradeType::Limit,
7195        };
7196
7197        let trades_response = crate::http::models::TradesResponse {
7198            trades: vec![http_trade],
7199        };
7200
7201        let state = TradesTestState {
7202            response: Arc::new(trades_response),
7203            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7204            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7205        };
7206
7207        let addr = start_trades_test_server(state).await;
7208        let base_url = format!("http://{}", addr);
7209
7210        let client_id = ClientId::from("DYDX-FIELDS-TEST");
7211        let config = DydxDataClientConfig {
7212            base_url_http: Some(base_url),
7213            is_testnet: true,
7214            ..Default::default()
7215        };
7216
7217        let http_client = DydxHttpClient::new(
7218            config.base_url_http.clone(),
7219            config.http_timeout_secs,
7220            config.http_proxy_url.clone(),
7221            config.is_testnet,
7222            None,
7223        )
7224        .unwrap();
7225
7226        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7227
7228        let instrument = create_test_instrument_any();
7229        let instrument_id = instrument.id();
7230        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7231        client.instruments.insert(symbol_key, instrument);
7232
7233        let request = RequestTrades::new(
7234            instrument_id,
7235            None,
7236            None,
7237            None,
7238            Some(client_id),
7239            UUID4::new(),
7240            get_atomic_clock_realtime().get_time_ns(),
7241            None,
7242        );
7243
7244        assert!(client.request_trades(&request).is_ok());
7245
7246        let timeout = tokio::time::Duration::from_millis(500);
7247        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7248            tokio::time::timeout(timeout, rx.recv()).await
7249        {
7250            assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7251            let tick = &resp.data[0];
7252
7253            // Verify all TradeTick fields are properly populated
7254            assert_eq!(
7255                tick.instrument_id, instrument_id,
7256                "instrument_id should be set"
7257            );
7258            assert!(tick.price.as_f64() > 0.0, "price should be positive");
7259            assert!(tick.size.as_f64() > 0.0, "size should be positive");
7260
7261            // Verify aggressor_side is set (Buy or Sell)
7262            match tick.aggressor_side {
7263                AggressorSide::Buyer | AggressorSide::Seller => {
7264                    // Valid aggressor side
7265                }
7266                AggressorSide::NoAggressor => {
7267                    panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7268                }
7269            }
7270
7271            // Verify trade_id is set
7272            assert!(
7273                !tick.trade_id.to_string().is_empty(),
7274                "trade_id should be set"
7275            );
7276
7277            // Verify timestamps are set and valid
7278            assert!(tick.ts_event > 0, "ts_event should be set");
7279            assert!(tick.ts_init > 0, "ts_init should be set");
7280            assert!(
7281                tick.ts_init >= tick.ts_event,
7282                "ts_init should be >= ts_event"
7283            );
7284        }
7285    }
7286
7287    #[tokio::test]
7288    async fn test_trades_response_includes_metadata() {
7289        // Verify TradesResponse includes all metadata fields
7290        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7291        set_data_event_sender(sender);
7292
7293        let created_at = chrono::Utc::now();
7294        let http_trade = crate::http::models::Trade {
7295            id: "metadata-test".to_string(),
7296            side: OrderSide::Buy,
7297            size: dec!(1.0),
7298            price: dec!(100.0),
7299            created_at,
7300            created_at_height: 100,
7301            trade_type: crate::common::enums::DydxTradeType::Limit,
7302        };
7303
7304        let trades_response = crate::http::models::TradesResponse {
7305            trades: vec![http_trade],
7306        };
7307
7308        let state = TradesTestState {
7309            response: Arc::new(trades_response),
7310            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7311            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7312        };
7313
7314        let addr = start_trades_test_server(state).await;
7315        let base_url = format!("http://{}", addr);
7316
7317        let client_id = ClientId::from("DYDX-META-TEST");
7318        let config = DydxDataClientConfig {
7319            base_url_http: Some(base_url),
7320            is_testnet: true,
7321            ..Default::default()
7322        };
7323
7324        let http_client = DydxHttpClient::new(
7325            config.base_url_http.clone(),
7326            config.http_timeout_secs,
7327            config.http_proxy_url.clone(),
7328            config.is_testnet,
7329            None,
7330        )
7331        .unwrap();
7332
7333        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7334
7335        let instrument = create_test_instrument_any();
7336        let instrument_id = instrument.id();
7337        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7338        client.instruments.insert(symbol_key, instrument);
7339
7340        let request_id = UUID4::new();
7341        let request = RequestTrades::new(
7342            instrument_id,
7343            None,
7344            None,
7345            None,
7346            Some(client_id),
7347            request_id,
7348            get_atomic_clock_realtime().get_time_ns(),
7349            None,
7350        );
7351
7352        assert!(client.request_trades(&request).is_ok());
7353
7354        let timeout = tokio::time::Duration::from_millis(500);
7355        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7356            tokio::time::timeout(timeout, rx.recv()).await
7357        {
7358            // Verify metadata fields
7359            assert_eq!(
7360                resp.correlation_id, request_id,
7361                "correlation_id should match request"
7362            );
7363            assert_eq!(resp.client_id, client_id, "client_id should be set");
7364            assert_eq!(
7365                resp.instrument_id, instrument_id,
7366                "instrument_id should be set"
7367            );
7368            assert!(resp.ts_init > 0, "ts_init should be set");
7369
7370            let _start = resp.start;
7371            let _end = resp.end;
7372            let _params = resp.params;
7373        }
7374    }
7375
7376    #[tokio::test]
7377    async fn test_orderbook_cache_growth_with_many_instruments() {
7378        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7379        set_data_event_sender(sender);
7380
7381        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7382        let config = DydxDataClientConfig {
7383            base_url_http: Some(base_url),
7384            is_testnet: true,
7385            ..Default::default()
7386        };
7387
7388        let http_client = DydxHttpClient::new(
7389            config.base_url_http.clone(),
7390            config.http_timeout_secs,
7391            config.http_proxy_url.clone(),
7392            config.is_testnet,
7393            None,
7394        )
7395        .unwrap();
7396
7397        let client =
7398            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7399
7400        let initial_capacity = client.order_books.capacity();
7401
7402        for i in 0..100 {
7403            let symbol = format!("INSTRUMENT-{i}");
7404            let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7405            client.order_books.insert(
7406                instrument_id,
7407                OrderBook::new(instrument_id, BookType::L2_MBP),
7408            );
7409        }
7410
7411        assert_eq!(client.order_books.len(), 100);
7412        assert!(client.order_books.capacity() >= initial_capacity);
7413
7414        client.order_books.clear();
7415        assert_eq!(client.order_books.len(), 0);
7416    }
7417
7418    #[rstest]
7419    #[ignore]
7420    fn test_instrument_id_validation_rejects_invalid_formats() {
7421        let invalid_ids = vec!["", "INVALID", "NO-VENUE", ".DYDX", "SYMBOL."];
7422
7423        for invalid_id in invalid_ids {
7424            let result = std::panic::catch_unwind(|| {
7425                let _ = InstrumentId::from(invalid_id);
7426            });
7427            assert!(
7428                result.is_err(),
7429                "Expected {invalid_id} to panic or be rejected"
7430            );
7431        }
7432    }
7433
7434    #[rstest]
7435    fn test_instrument_id_validation_accepts_valid_formats() {
7436        let valid_ids = vec![
7437            "BTC-USD-PERP.DYDX",
7438            "ETH-USD-PERP.DYDX",
7439            "SOL-USD.DYDX",
7440            "AVAX-USD-PERP.DYDX",
7441        ];
7442
7443        for valid_id in valid_ids {
7444            let instrument_id = InstrumentId::from(valid_id);
7445            assert!(
7446                !instrument_id.symbol.as_str().is_empty()
7447                    && !instrument_id.venue.as_str().is_empty(),
7448                "Expected {valid_id} to have non-empty symbol and venue"
7449            );
7450        }
7451    }
7452
7453    #[tokio::test]
7454    async fn test_request_bars_with_inverted_date_range() {
7455        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7456        set_data_event_sender(sender);
7457
7458        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7459        let config = DydxDataClientConfig {
7460            base_url_http: Some(base_url),
7461            is_testnet: true,
7462            ..Default::default()
7463        };
7464
7465        let http_client = DydxHttpClient::new(
7466            config.base_url_http.clone(),
7467            config.http_timeout_secs,
7468            config.http_proxy_url.clone(),
7469            config.is_testnet,
7470            None,
7471        )
7472        .unwrap();
7473
7474        let client =
7475            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7476
7477        let instrument = create_test_instrument_any();
7478        let instrument_id = instrument.id();
7479        client
7480            .instruments
7481            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7482
7483        let spec = BarSpecification {
7484            step: std::num::NonZeroUsize::new(1).unwrap(),
7485            aggregation: BarAggregation::Minute,
7486            price_type: PriceType::Last,
7487        };
7488        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7489
7490        let now = chrono::Utc::now();
7491        let start = Some(now);
7492        let end = Some(now - chrono::Duration::hours(1));
7493
7494        let request = RequestBars::new(
7495            bar_type,
7496            start,
7497            end,
7498            None,
7499            Some(ClientId::from("dydx_test")),
7500            UUID4::new(),
7501            get_atomic_clock_realtime().get_time_ns(),
7502            None,
7503        );
7504
7505        let result = client.request_bars(&request);
7506        assert!(result.is_ok());
7507    }
7508
7509    #[tokio::test]
7510    async fn test_request_bars_with_zero_limit() {
7511        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7512        set_data_event_sender(sender);
7513
7514        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7515        let config = DydxDataClientConfig {
7516            base_url_http: Some(base_url),
7517            is_testnet: true,
7518            ..Default::default()
7519        };
7520
7521        let http_client = DydxHttpClient::new(
7522            config.base_url_http.clone(),
7523            config.http_timeout_secs,
7524            config.http_proxy_url.clone(),
7525            config.is_testnet,
7526            None,
7527        )
7528        .unwrap();
7529
7530        let client =
7531            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7532
7533        let instrument = create_test_instrument_any();
7534        let instrument_id = instrument.id();
7535        client
7536            .instruments
7537            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7538
7539        let spec = BarSpecification {
7540            step: std::num::NonZeroUsize::new(1).unwrap(),
7541            aggregation: BarAggregation::Minute,
7542            price_type: PriceType::Last,
7543        };
7544        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7545
7546        let request = RequestBars::new(
7547            bar_type,
7548            None,
7549            None,
7550            Some(std::num::NonZeroUsize::new(1).unwrap()),
7551            Some(ClientId::from("dydx_test")),
7552            UUID4::new(),
7553            get_atomic_clock_realtime().get_time_ns(),
7554            None,
7555        );
7556
7557        let result = client.request_bars(&request);
7558        assert!(result.is_ok());
7559    }
7560
7561    #[tokio::test]
7562    async fn test_request_trades_with_excessive_limit() {
7563        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7564        set_data_event_sender(sender);
7565
7566        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7567        let config = DydxDataClientConfig {
7568            base_url_http: Some(base_url),
7569            is_testnet: true,
7570            ..Default::default()
7571        };
7572
7573        let http_client = DydxHttpClient::new(
7574            config.base_url_http.clone(),
7575            config.http_timeout_secs,
7576            config.http_proxy_url.clone(),
7577            config.is_testnet,
7578            None,
7579        )
7580        .unwrap();
7581
7582        let client =
7583            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7584
7585        let instrument = create_test_instrument_any();
7586        let instrument_id = instrument.id();
7587        client
7588            .instruments
7589            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7590
7591        let request = RequestTrades::new(
7592            instrument_id,
7593            None,
7594            None,
7595            Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7596            Some(ClientId::from("dydx_test")),
7597            UUID4::new(),
7598            get_atomic_clock_realtime().get_time_ns(),
7599            None,
7600        );
7601
7602        let result = client.request_trades(&request);
7603        assert!(result.is_ok());
7604    }
7605}