nautilus_dydx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use chrono::{DateTime, Utc};
25use dashmap::DashMap;
26use nautilus_common::{
27    live::runner::get_data_event_sender,
28    messages::{
29        DataEvent, DataResponse,
30        data::{
31            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
32            RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
33            SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
34            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
35            UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
36            UnsubscribeQuotes, UnsubscribeTrades,
37        },
38    },
39};
40use nautilus_core::{
41    UnixNanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46    data::{
47        Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
48        OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
49    },
50    enums::{BarAggregation, BookAction, BookType, OrderSide, RecordFlag},
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53    orderbook::OrderBook,
54    types::{Price, Quantity, price::PriceRaw},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61    common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
62    config::DydxDataClientConfig,
63    http::client::DydxHttpClient,
64    types::DydxOraclePrice,
65    websocket::client::DydxWebSocketClient,
66};
67
68/// Groups WebSocket message handling dependencies.
69struct WsMessageContext<'a> {
70    data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
71    instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
72    order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
73    last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
74    ws_client: &'a Option<DydxWebSocketClient>,
75    active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
76    active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
77    active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
78    incomplete_bars: &'a Arc<DashMap<BarType, Bar>>,
79}
80
81/// dYdX data client for live market data streaming and historical data requests.
82///
83/// This client integrates with the Nautilus DataEngine to provide:
84/// - Real-time market data via WebSocket subscriptions
85/// - Historical data via REST API requests
86/// - Automatic instrument discovery and caching
87/// - Connection lifecycle management
88#[derive(Debug)]
89pub struct DydxDataClient {
90    /// The client ID for this data client.
91    client_id: ClientId,
92    /// Configuration for the data client.
93    config: DydxDataClientConfig,
94    /// HTTP client for REST API requests.
95    http_client: DydxHttpClient,
96    /// WebSocket client for real-time data streaming (optional).
97    ws_client: Option<DydxWebSocketClient>,
98    /// Whether the client is currently connected.
99    is_connected: AtomicBool,
100    /// Cancellation token for async operations.
101    cancellation_token: CancellationToken,
102    /// Background task handles.
103    tasks: Vec<JoinHandle<()>>,
104    /// Channel sender for emitting data events to the DataEngine.
105    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
106    /// Cached instruments by symbol (shared with HTTP client via `Arc<DashMap<Ustr, InstrumentAny>>`).
107    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
108    /// High-resolution clock for timestamps.
109    clock: &'static AtomicTime,
110    /// Local order books maintained for generating quotes and resolving crosses.
111    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
112    /// Last quote tick per instrument (used for quote generation from book deltas).
113    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
114    /// Incomplete bars cache for bar aggregation.
115    /// Tracks bars not yet closed (ts_event > current_time), keyed by BarType.
116    /// Bars are emitted only when they close (ts_event <= current_time).
117    incomplete_bars: Arc<DashMap<BarType, Bar>>,
118    /// WebSocket topic to BarType mappings.
119    /// Maps dYdX candle topics (e.g., "BTC-USD/1MIN") to Nautilus BarType.
120    /// Used for subscription validation and reconnection recovery.
121    bar_type_mappings: Arc<DashMap<String, BarType>>,
122    /// Active orderbook subscriptions for periodic snapshot refresh.
123    active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
124    /// Active trade subscriptions for reconnection recovery.
125    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
126    /// Active bar/candle subscriptions for reconnection recovery (maps instrument+resolution to BarType).
127    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
128}
129
130impl DydxDataClient {
131    /// Maps Nautilus BarType spec to dYdX candle resolution string.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the bar aggregation or step is not supported by dYdX.
136    fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
137        match spec.step.get() {
138            1 => match spec.aggregation {
139                BarAggregation::Minute => Ok("1MIN"),
140                BarAggregation::Hour => Ok("1HOUR"),
141                BarAggregation::Day => Ok("1DAY"),
142                _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143            },
144            5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145            15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146            30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147            4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148            step => anyhow::bail!(
149                "Unsupported bar step: {step} with aggregation {:?}",
150                spec.aggregation
151            ),
152        }
153    }
154
155    /// Creates a new [`DydxDataClient`] instance.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the client fails to initialize.
160    pub fn new(
161        client_id: ClientId,
162        config: DydxDataClientConfig,
163        http_client: DydxHttpClient,
164        ws_client: Option<DydxWebSocketClient>,
165    ) -> anyhow::Result<Self> {
166        let clock = get_atomic_clock_realtime();
167        let data_sender = get_data_event_sender();
168
169        // Clone the instruments cache before moving http_client
170        let instruments_cache = http_client.instruments().clone();
171
172        Ok(Self {
173            client_id,
174            config,
175            http_client,
176            ws_client,
177            is_connected: AtomicBool::new(false),
178            cancellation_token: CancellationToken::new(),
179            tasks: Vec::new(),
180            data_sender,
181            instruments: instruments_cache,
182            clock,
183            order_books: Arc::new(DashMap::new()),
184            last_quotes: Arc::new(DashMap::new()),
185            incomplete_bars: Arc::new(DashMap::new()),
186            bar_type_mappings: Arc::new(DashMap::new()),
187            active_orderbook_subs: Arc::new(DashMap::new()),
188            active_trade_subs: Arc::new(DashMap::new()),
189            active_bar_subs: Arc::new(DashMap::new()),
190        })
191    }
192
193    /// Returns the venue for this data client.
194    #[must_use]
195    pub fn venue(&self) -> Venue {
196        *DYDX_VENUE
197    }
198
199    fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200        self.ws_client
201            .as_ref()
202            .context("websocket client not initialized; call connect first")
203    }
204
205    /// Mutable WebSocket client access for operations requiring mutable references.
206    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207        self.ws_client
208            .as_mut()
209            .context("websocket client not initialized; call connect first")
210    }
211
212    /// Returns `true` when the client is connected.
213    #[must_use]
214    pub fn is_connected(&self) -> bool {
215        self.is_connected.load(Ordering::Relaxed)
216    }
217
218    /// Spawns an async WebSocket task with error handling.
219    ///
220    /// This helper ensures consistent error logging across all subscription methods.
221    fn spawn_ws<F>(&self, fut: F, context: &'static str)
222    where
223        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224    {
225        tokio::spawn(async move {
226            if let Err(e) = fut.await {
227                tracing::error!("{context}: {e:?}");
228            }
229        });
230    }
231
232    /// Bootstrap instruments from the dYdX Indexer API.
233    ///
234    /// This method:
235    /// 1. Fetches all available instruments from the REST API
236    /// 2. Caches them in the HTTP client
237    /// 3. Caches them in the WebSocket client (if present)
238    /// 4. Populates the local instruments cache
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if:
243    /// - The HTTP request fails.
244    /// - Instrument parsing fails.
245    ///
246    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247        tracing::info!("Bootstrapping dYdX instruments");
248
249        // Fetch instruments from HTTP API
250        // Note: maker_fee and taker_fee can be None initially - they'll be set to zero
251        let instruments = self
252            .http_client
253            .request_instruments(None, None, None)
254            .await
255            .context("failed to load instruments from dYdX")?;
256
257        if instruments.is_empty() {
258            tracing::warn!("No dYdX instruments were loaded");
259            return Ok(instruments);
260        }
261
262        tracing::info!("Loaded {} dYdX instruments", instruments.len());
263
264        // Cache instruments in HTTP client (request_instruments does NOT cache automatically)
265        self.http_client.cache_instruments(instruments.clone());
266
267        // Cache in WebSocket client if present
268        if let Some(ref ws) = self.ws_client {
269            ws.cache_instruments(instruments.clone());
270        }
271
272        Ok(instruments)
273    }
274}
275
276// Implement DataClient trait for integration with Nautilus DataEngine
277#[async_trait::async_trait(?Send)]
278impl DataClient for DydxDataClient {
279    fn client_id(&self) -> ClientId {
280        self.client_id
281    }
282
283    fn venue(&self) -> Option<Venue> {
284        Some(*DYDX_VENUE)
285    }
286
287    fn start(&mut self) -> anyhow::Result<()> {
288        tracing::info!(
289            client_id = %self.client_id,
290            is_testnet = self.http_client.is_testnet(),
291            "Starting dYdX data client"
292        );
293        Ok(())
294    }
295
296    fn stop(&mut self) -> anyhow::Result<()> {
297        tracing::info!("Stopping dYdX data client {}", self.client_id);
298        self.cancellation_token.cancel();
299        self.is_connected.store(false, Ordering::Relaxed);
300        Ok(())
301    }
302
303    fn reset(&mut self) -> anyhow::Result<()> {
304        tracing::debug!("Resetting dYdX data client {}", self.client_id);
305        self.is_connected.store(false, Ordering::Relaxed);
306        self.cancellation_token = CancellationToken::new();
307        self.tasks.clear();
308        Ok(())
309    }
310
311    fn dispose(&mut self) -> anyhow::Result<()> {
312        tracing::debug!("Disposing dYdX data client {}", self.client_id);
313        self.stop()
314    }
315
316    async fn connect(&mut self) -> anyhow::Result<()> {
317        if self.is_connected() {
318            return Ok(());
319        }
320
321        tracing::info!("Connecting dYdX data client");
322
323        // Bootstrap instruments first
324        self.bootstrap_instruments().await?;
325
326        // Connect WebSocket client and subscribe to market updates
327        if self.ws_client.is_some() {
328            let ws = self.ws_client_mut()?;
329
330            ws.connect()
331                .await
332                .context("failed to connect dYdX websocket")?;
333
334            ws.subscribe_markets()
335                .await
336                .context("failed to subscribe to markets channel")?;
337
338            // Start message processing task (handler already converts to NautilusWsMessage)
339            if let Some(rx) = ws.take_receiver() {
340                let data_tx = self.data_sender.clone();
341                let instruments = self.instruments.clone();
342                let order_books = self.order_books.clone();
343                let last_quotes = self.last_quotes.clone();
344                let ws_client = self.ws_client.clone();
345                let active_orderbook_subs = self.active_orderbook_subs.clone();
346                let active_trade_subs = self.active_trade_subs.clone();
347                let active_bar_subs = self.active_bar_subs.clone();
348                let incomplete_bars = self.incomplete_bars.clone();
349
350                let task = tokio::spawn(async move {
351                    let mut rx = rx;
352                    while let Some(msg) = rx.recv().await {
353                        let ctx = WsMessageContext {
354                            data_sender: &data_tx,
355                            instruments: &instruments,
356                            order_books: &order_books,
357                            last_quotes: &last_quotes,
358                            ws_client: &ws_client,
359                            active_orderbook_subs: &active_orderbook_subs,
360                            active_trade_subs: &active_trade_subs,
361                            active_bar_subs: &active_bar_subs,
362                            incomplete_bars: &incomplete_bars,
363                        };
364                        Self::handle_ws_message(msg, &ctx);
365                    }
366                });
367                self.tasks.push(task);
368            } else {
369                tracing::warn!("No inbound WS receiver available after connect");
370            }
371        }
372
373        // Start orderbook snapshot refresh task
374        self.start_orderbook_refresh_task()?;
375
376        // Start instrument refresh task
377        self.start_instrument_refresh_task()?;
378
379        self.is_connected.store(true, Ordering::Relaxed);
380        tracing::info!("Connected dYdX data client");
381
382        Ok(())
383    }
384
385    async fn disconnect(&mut self) -> anyhow::Result<()> {
386        if !self.is_connected() {
387            return Ok(());
388        }
389
390        tracing::info!("Disconnecting dYdX data client");
391
392        // Disconnect WebSocket client if present
393        if let Some(ref mut ws) = self.ws_client {
394            ws.disconnect()
395                .await
396                .context("failed to disconnect dYdX websocket")?;
397        }
398
399        self.is_connected.store(false, Ordering::Relaxed);
400        tracing::info!("Disconnected dYdX data client");
401
402        Ok(())
403    }
404
405    fn is_connected(&self) -> bool {
406        self.is_connected.load(Ordering::Relaxed)
407    }
408
409    fn is_disconnected(&self) -> bool {
410        !self.is_connected()
411    }
412
413    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
414        // dYdX uses a global markets channel which streams instruments implicitly.
415        // There is no dedicated instruments subscription, so this is a no-op to
416        // mirror the behaviour of `subscribe_instruments`.
417        tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
418        Ok(())
419    }
420
421    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
422        // dYdX does not support per-instrument instrument feed subscriptions.
423        // The markets channel always streams all instruments, so this is a no-op.
424        tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
425        Ok(())
426    }
427
428    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
429        // dYdX markets channel auto-subscribes to all instruments
430        // No explicit subscription needed - already handled in connect()
431        tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
432        Ok(())
433    }
434
435    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
436        // dYdX markets channel auto-subscribes to all instruments
437        // Individual instrument subscriptions not supported - full feed only
438        tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
439        Ok(())
440    }
441
442    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
443        let ws = self.ws_client()?.clone();
444        let instrument_id = cmd.instrument_id;
445
446        // Track active subscription for reconnection recovery
447        self.active_trade_subs.insert(instrument_id, ());
448
449        self.spawn_ws(
450            async move {
451                ws.subscribe_trades(instrument_id)
452                    .await
453                    .context("trade subscription")
454            },
455            "dYdX trade subscription",
456        );
457
458        Ok(())
459    }
460
461    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462        if cmd.book_type != BookType::L2_MBP {
463            anyhow::bail!(
464                "dYdX only supports L2_MBP order book deltas, received {:?}",
465                cmd.book_type
466            );
467        }
468
469        // Ensure local order book exists for this instrument.
470        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
471
472        // Track active subscription for periodic refresh
473        self.active_orderbook_subs.insert(cmd.instrument_id, ());
474
475        let ws = self.ws_client()?.clone();
476        let instrument_id = cmd.instrument_id;
477
478        self.spawn_ws(
479            async move {
480                ws.subscribe_orderbook(instrument_id)
481                    .await
482                    .context("orderbook subscription")
483            },
484            "dYdX orderbook subscription",
485        );
486
487        Ok(())
488    }
489
490    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
491        if cmd.book_type != BookType::L2_MBP {
492            anyhow::bail!(
493                "dYdX only supports L2_MBP order book snapshots, received {:?}",
494                cmd.book_type
495            );
496        }
497
498        // Track active subscription for periodic refresh
499        self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501        let ws = self.ws_client()?.clone();
502        let instrument_id = cmd.instrument_id;
503
504        tokio::spawn(async move {
505            if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506                tracing::error!(
507                    "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508                );
509            }
510        });
511
512        Ok(())
513    }
514
515    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516        // dYdX doesn't have a dedicated quotes channel
517        // Quotes are synthesized from order book deltas
518        tracing::debug!(
519            "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520            cmd.instrument_id
521        );
522
523        // Simply delegate to book deltas subscription
524        let book_cmd = SubscribeBookDeltas {
525            client_id: cmd.client_id,
526            venue: cmd.venue,
527            instrument_id: cmd.instrument_id,
528            book_type: BookType::L2_MBP,
529            depth: None,
530            managed: false,
531            params: None,
532            command_id: cmd.command_id,
533            ts_init: cmd.ts_init,
534        };
535
536        self.subscribe_book_deltas(&book_cmd)
537    }
538
539    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
540        let ws = self.ws_client()?.clone();
541        let instrument_id = cmd.bar_type.instrument_id();
542        let spec = cmd.bar_type.spec();
543
544        // Use centralized bar spec mapping
545        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
546
547        // Track active subscription for reconnection recovery
548        let bar_type = cmd.bar_type;
549        self.active_bar_subs
550            .insert((instrument_id, resolution.to_string()), bar_type);
551
552        // Register topic → BarType mapping for validation and lookup
553        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
554        let topic = format!("{ticker}/{resolution}");
555        self.bar_type_mappings.insert(topic.clone(), bar_type);
556
557        self.spawn_ws(
558            async move {
559                // Register bar type in handler BEFORE subscribing to avoid race condition
560                if let Err(e) =
561                    ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
562                        topic,
563                        bar_type,
564                    })
565                {
566                    anyhow::bail!("Failed to register bar type: {e}");
567                }
568
569                // Delay to ensure handler processes registration before candle messages arrive
570                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
571
572                ws.subscribe_candles(instrument_id, resolution)
573                    .await
574                    .context("candles subscription")
575            },
576            "dYdX candles subscription",
577        );
578
579        Ok(())
580    }
581
582    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
583        // Remove from active subscription tracking
584        self.active_trade_subs.remove(&cmd.instrument_id);
585
586        let ws = self.ws_client()?.clone();
587        let instrument_id = cmd.instrument_id;
588
589        self.spawn_ws(
590            async move {
591                ws.unsubscribe_trades(instrument_id)
592                    .await
593                    .context("trade unsubscription")
594            },
595            "dYdX trade unsubscription",
596        );
597
598        Ok(())
599    }
600
601    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
602        // Remove from active subscription tracking
603        self.active_orderbook_subs.remove(&cmd.instrument_id);
604
605        let ws = self.ws_client()?.clone();
606        let instrument_id = cmd.instrument_id;
607
608        self.spawn_ws(
609            async move {
610                ws.unsubscribe_orderbook(instrument_id)
611                    .await
612                    .context("orderbook unsubscription")
613            },
614            "dYdX orderbook unsubscription",
615        );
616
617        Ok(())
618    }
619
620    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
621        // dYdX orderbook channel provides both snapshots and deltas.
622        // Unsubscribing snapshots uses the same underlying channel as deltas.
623        // Remove from active subscription tracking
624        self.active_orderbook_subs.remove(&cmd.instrument_id);
625
626        let ws = self.ws_client()?.clone();
627        let instrument_id = cmd.instrument_id;
628
629        self.spawn_ws(
630            async move {
631                ws.unsubscribe_orderbook(instrument_id)
632                    .await
633                    .context("orderbook snapshot unsubscription")
634            },
635            "dYdX orderbook snapshot unsubscription",
636        );
637
638        Ok(())
639    }
640
641    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
642        // dYdX doesn't have a dedicated quotes channel; quotes are derived from book deltas.
643        tracing::debug!(
644            "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
645            cmd.instrument_id
646        );
647
648        let book_cmd = UnsubscribeBookDeltas {
649            instrument_id: cmd.instrument_id,
650            client_id: cmd.client_id,
651            venue: cmd.venue,
652            command_id: cmd.command_id,
653            ts_init: cmd.ts_init,
654            params: cmd.params.clone(),
655        };
656
657        self.unsubscribe_book_deltas(&book_cmd)
658    }
659
660    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
661        let ws = self.ws_client()?.clone();
662        let instrument_id = cmd.bar_type.instrument_id();
663        let spec = cmd.bar_type.spec();
664
665        // Map BarType spec to dYdX candle resolution string
666        let resolution = match spec.step.get() {
667            1 => match spec.aggregation {
668                BarAggregation::Minute => "1MIN",
669                BarAggregation::Hour => "1HOUR",
670                BarAggregation::Day => "1DAY",
671                _ => {
672                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
673                }
674            },
675            5 => {
676                if spec.aggregation == BarAggregation::Minute {
677                    "5MINS"
678                } else {
679                    anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
680                }
681            }
682            15 => {
683                if spec.aggregation == BarAggregation::Minute {
684                    "15MINS"
685                } else {
686                    anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
687                }
688            }
689            30 => {
690                if spec.aggregation == BarAggregation::Minute {
691                    "30MINS"
692                } else {
693                    anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
694                }
695            }
696            4 => {
697                if spec.aggregation == BarAggregation::Hour {
698                    "4HOURS"
699                } else {
700                    anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
701                }
702            }
703            step => {
704                anyhow::bail!("Unsupported bar step: {step}");
705            }
706        };
707
708        // Remove from active subscription tracking
709        self.active_bar_subs
710            .remove(&(instrument_id, resolution.to_string()));
711
712        // Unregister bar type from handler and local mappings
713        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
714        let topic = format!("{ticker}/{resolution}");
715        self.bar_type_mappings.remove(&topic);
716
717        if let Err(e) =
718            ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
719        {
720            tracing::warn!("Failed to unregister bar type: {e}");
721        }
722
723        self.spawn_ws(
724            async move {
725                ws.unsubscribe_candles(instrument_id, resolution)
726                    .await
727                    .context("candles unsubscription")
728            },
729            "dYdX candles unsubscription",
730        );
731
732        Ok(())
733    }
734
735    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
736        let instruments_cache = self.instruments.clone();
737        let sender = self.data_sender.clone();
738        let http = self.http_client.clone();
739        let instrument_id = request.instrument_id;
740        let request_id = request.request_id;
741        let client_id = request.client_id.unwrap_or(self.client_id);
742        let start = request.start;
743        let end = request.end;
744        let params = request.params.clone();
745        let clock = self.clock;
746        let start_nanos = datetime_to_unix_nanos(start);
747        let end_nanos = datetime_to_unix_nanos(end);
748
749        tokio::spawn(async move {
750            // First try to get from cache
751            let symbol = Ustr::from(instrument_id.symbol.as_str());
752            let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
753                tracing::debug!("Found instrument {instrument_id} in cache");
754                Some(cached.clone())
755            } else {
756                // Not in cache, fetch from API
757                tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
758                match http.request_instruments(None, None, None).await {
759                    Ok(instruments) => {
760                        // Cache all fetched instruments
761                        for inst in &instruments {
762                            upsert_instrument(&instruments_cache, inst.clone());
763                        }
764                        // Find the requested instrument
765                        instruments.into_iter().find(|i| i.id() == instrument_id)
766                    }
767                    Err(e) => {
768                        tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
769                        None
770                    }
771                }
772            };
773
774            if let Some(inst) = instrument {
775                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
776                    request_id,
777                    client_id,
778                    instrument_id,
779                    inst,
780                    start_nanos,
781                    end_nanos,
782                    clock.get_time_ns(),
783                    params,
784                )));
785
786                if let Err(e) = sender.send(DataEvent::Response(response)) {
787                    tracing::error!("Failed to send instrument response: {e}");
788                }
789            } else {
790                tracing::error!("Instrument {instrument_id} not found");
791            }
792        });
793
794        Ok(())
795    }
796
797    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
798        let http = self.http_client.clone();
799        let sender = self.data_sender.clone();
800        let instruments_cache = self.instruments.clone();
801        let request_id = request.request_id;
802        let client_id = request.client_id.unwrap_or(self.client_id);
803        let venue = self.venue();
804        let start = request.start;
805        let end = request.end;
806        let params = request.params.clone();
807        let clock = self.clock;
808        let start_nanos = datetime_to_unix_nanos(start);
809        let end_nanos = datetime_to_unix_nanos(end);
810
811        tokio::spawn(async move {
812            match http.request_instruments(None, None, None).await {
813                Ok(instruments) => {
814                    tracing::info!("Fetched {} instruments from dYdX", instruments.len());
815
816                    // Cache all instruments
817                    for instrument in &instruments {
818                        upsert_instrument(&instruments_cache, instrument.clone());
819                    }
820
821                    let response = DataResponse::Instruments(InstrumentsResponse::new(
822                        request_id,
823                        client_id,
824                        venue,
825                        instruments,
826                        start_nanos,
827                        end_nanos,
828                        clock.get_time_ns(),
829                        params,
830                    ));
831
832                    if let Err(e) = sender.send(DataEvent::Response(response)) {
833                        tracing::error!("Failed to send instruments response: {e}");
834                    }
835                }
836                Err(e) => {
837                    tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
838
839                    // Send empty response on error
840                    let response = DataResponse::Instruments(InstrumentsResponse::new(
841                        request_id,
842                        client_id,
843                        venue,
844                        Vec::new(),
845                        start_nanos,
846                        end_nanos,
847                        clock.get_time_ns(),
848                        params,
849                    ));
850
851                    if let Err(e) = sender.send(DataEvent::Response(response)) {
852                        tracing::error!("Failed to send empty instruments response: {e}");
853                    }
854                }
855            }
856        });
857
858        Ok(())
859    }
860
861    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
862        use nautilus_model::{
863            data::TradeTick,
864            enums::{AggressorSide, OrderSide},
865            identifiers::TradeId,
866        };
867
868        let http = self.http_client.clone();
869        let instruments = self.instruments.clone();
870        let sender = self.data_sender.clone();
871        let instrument_id = request.instrument_id;
872        let start = request.start;
873        let end = request.end;
874        let limit = request.limit.map(|n| n.get() as u32);
875        let request_id = request.request_id;
876        let client_id = request.client_id.unwrap_or(self.client_id);
877        let params = request.params.clone();
878        let clock = self.clock;
879        let start_nanos = datetime_to_unix_nanos(start);
880        let end_nanos = datetime_to_unix_nanos(end);
881
882        tokio::spawn(async move {
883            // dYdX Indexer trades endpoint supports `limit` but not an explicit
884            // date range in this client; we approximate by using the provided
885            // limit and instrument metadata for precision.
886            let ticker = instrument_id
887                .symbol
888                .as_str()
889                .trim_end_matches("-PERP")
890                .to_string();
891
892            // Look up instrument to derive price and size precision.
893            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
894                Some(inst) => inst.clone(),
895                None => {
896                    tracing::error!(
897                        "request_trades: instrument {} not found in cache; cannot convert trades",
898                        instrument_id
899                    );
900                    let ts_now = clock.get_time_ns();
901                    let response = DataResponse::Trades(TradesResponse::new(
902                        request_id,
903                        client_id,
904                        instrument_id,
905                        Vec::new(),
906                        start_nanos,
907                        end_nanos,
908                        ts_now,
909                        params,
910                    ));
911                    if let Err(e) = sender.send(DataEvent::Response(response)) {
912                        tracing::error!("Failed to send empty trades response: {e}");
913                    }
914                    return;
915                }
916            };
917
918            let price_precision = instrument.price_precision();
919            let size_precision = instrument.size_precision();
920
921            match http
922                .inner
923                .get_trades(&ticker, limit)
924                .await
925                .context("failed to request trades from dYdX")
926            {
927                Ok(trades_response) => {
928                    let mut ticks = Vec::new();
929
930                    for trade in trades_response.trades {
931                        let aggressor_side = match trade.side {
932                            OrderSide::Buy => AggressorSide::Buyer,
933                            OrderSide::Sell => AggressorSide::Seller,
934                            _ => continue, // Skip unsupported side
935                        };
936
937                        let price = match Price::from_decimal_dp(trade.price, price_precision) {
938                            Ok(p) => p,
939                            Err(e) => {
940                                tracing::warn!(
941                                    "request_trades: failed to convert price for trade {}: {e}",
942                                    trade.id
943                                );
944                                continue;
945                            }
946                        };
947
948                        let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
949                            Ok(q) => q,
950                            Err(e) => {
951                                tracing::warn!(
952                                    "request_trades: failed to convert size for trade {}: {e}",
953                                    trade.id
954                                );
955                                continue;
956                            }
957                        };
958
959                        let ts_event = match trade.created_at.timestamp_nanos_opt() {
960                            Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
961                            _ => {
962                                tracing::warn!(
963                                    "request_trades: timestamp out of range for trade {}",
964                                    trade.id
965                                );
966                                continue;
967                            }
968                        };
969
970                        // Apply optional time-range filter.
971                        if let Some(start_ts) = start_nanos
972                            && ts_event < start_ts
973                        {
974                            continue;
975                        }
976                        if let Some(end_ts) = end_nanos
977                            && ts_event > end_ts
978                        {
979                            continue;
980                        }
981
982                        let tick = TradeTick::new(
983                            instrument_id,
984                            price,
985                            size,
986                            aggressor_side,
987                            TradeId::new(&trade.id),
988                            ts_event,
989                            clock.get_time_ns(),
990                        );
991                        ticks.push(tick);
992                    }
993
994                    let response = DataResponse::Trades(TradesResponse::new(
995                        request_id,
996                        client_id,
997                        instrument_id,
998                        ticks,
999                        start_nanos,
1000                        end_nanos,
1001                        clock.get_time_ns(),
1002                        params,
1003                    ));
1004
1005                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1006                        tracing::error!("Failed to send trades response: {e}");
1007                    }
1008                }
1009                Err(e) => {
1010                    tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1011
1012                    let response = DataResponse::Trades(TradesResponse::new(
1013                        request_id,
1014                        client_id,
1015                        instrument_id,
1016                        Vec::new(),
1017                        start_nanos,
1018                        end_nanos,
1019                        clock.get_time_ns(),
1020                        params,
1021                    ));
1022
1023                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1024                        tracing::error!("Failed to send empty trades response: {e}");
1025                    }
1026                }
1027            }
1028        });
1029
1030        Ok(())
1031    }
1032
1033    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1034        use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1035
1036        const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1037
1038        let bar_type = request.bar_type;
1039        let spec = bar_type.spec();
1040
1041        // Validate bar type requirements
1042        if bar_type.aggregation_source() != AggregationSource::External {
1043            anyhow::bail!(
1044                "dYdX only supports EXTERNAL aggregation, got {:?}",
1045                bar_type.aggregation_source()
1046            );
1047        }
1048
1049        if spec.price_type != PriceType::Last {
1050            anyhow::bail!(
1051                "dYdX only supports LAST price type, got {:?}",
1052                spec.price_type
1053            );
1054        }
1055
1056        // Map BarType spec to dYdX resolution
1057        let resolution = match spec.step.get() {
1058            1 => match spec.aggregation {
1059                BarAggregation::Minute => "1MIN",
1060                BarAggregation::Hour => "1HOUR",
1061                BarAggregation::Day => "1DAY",
1062                _ => {
1063                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1064                }
1065            },
1066            5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1067            15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1068            30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1069            4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1070            step => {
1071                anyhow::bail!("Unsupported bar step: {step}");
1072            }
1073        };
1074
1075        let http = self.http_client.clone();
1076        let instruments = self.instruments.clone();
1077        let sender = self.data_sender.clone();
1078        let instrument_id = bar_type.instrument_id();
1079        // dYdX ticker does not include the "-PERP" suffix.
1080        let symbol = instrument_id
1081            .symbol
1082            .as_str()
1083            .trim_end_matches("-PERP")
1084            .to_string();
1085        let request_id = request.request_id;
1086        let client_id = request.client_id.unwrap_or(self.client_id);
1087        let params = request.params.clone();
1088        let clock = self.clock;
1089
1090        let start = request.start;
1091        let end = request.end;
1092        let overall_limit = request.limit.map(|n| n.get() as u32);
1093
1094        // Convert optional datetimes to UnixNanos for response metadata
1095        let start_nanos = datetime_to_unix_nanos(start);
1096        let end_nanos = datetime_to_unix_nanos(end);
1097
1098        // Parse resolution string to DydxCandleResolution enum
1099        let resolution_enum = match resolution {
1100            "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1101            "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1102            "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1103            "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1104            "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1105            "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1106            "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1107            _ => {
1108                anyhow::bail!("Unsupported resolution: {resolution}");
1109            }
1110        };
1111
1112        tokio::spawn(async move {
1113            // Determine bar duration in seconds.
1114            let bar_secs: i64 = match spec.aggregation {
1115                BarAggregation::Minute => spec.step.get() as i64 * 60,
1116                BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1117                BarAggregation::Day => spec.step.get() as i64 * 86_400,
1118                _ => {
1119                    tracing::error!(
1120                        "Unsupported aggregation for request_bars: {:?}",
1121                        spec.aggregation
1122                    );
1123                    return;
1124                }
1125            };
1126
1127            // Look up instrument to derive price and size precision.
1128            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1129                Some(inst) => inst.clone(),
1130                None => {
1131                    tracing::error!(
1132                        "request_bars: instrument {} not found in cache; cannot convert candles",
1133                        instrument_id
1134                    );
1135                    let ts_now = clock.get_time_ns();
1136                    let response = DataResponse::Bars(BarsResponse::new(
1137                        request_id,
1138                        client_id,
1139                        bar_type,
1140                        Vec::new(),
1141                        start_nanos,
1142                        end_nanos,
1143                        ts_now,
1144                        params,
1145                    ));
1146                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1147                        tracing::error!("Failed to send empty bars response: {e}");
1148                    }
1149                    return;
1150                }
1151            };
1152
1153            let price_precision = instrument.price_precision();
1154            let size_precision = instrument.size_precision();
1155
1156            let mut all_bars: Vec<Bar> = Vec::new();
1157
1158            // If no explicit date range, fall back to a single request using only `limit`.
1159            let (range_start, range_end) = match (start, end) {
1160                (Some(s), Some(e)) if e > s => (s, e),
1161                _ => {
1162                    let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1163                    match http
1164                        .inner
1165                        .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1166                        .await
1167                    {
1168                        Ok(candles_response) => {
1169                            tracing::debug!(
1170                                "request_bars fetched {} candles without explicit date range",
1171                                candles_response.candles.len()
1172                            );
1173
1174                            for candle in &candles_response.candles {
1175                                match Self::candle_to_bar(
1176                                    candle,
1177                                    bar_type,
1178                                    price_precision,
1179                                    size_precision,
1180                                    bar_secs,
1181                                    clock,
1182                                ) {
1183                                    Ok(bar) => all_bars.push(bar),
1184                                    Err(e) => {
1185                                        tracing::warn!(
1186                                            "Failed to convert dYdX candle to bar for {}: {e}",
1187                                            instrument_id
1188                                        );
1189                                    }
1190                                }
1191                            }
1192
1193                            let current_time_ns = clock.get_time_ns();
1194                            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1195
1196                            let response = DataResponse::Bars(BarsResponse::new(
1197                                request_id,
1198                                client_id,
1199                                bar_type,
1200                                all_bars,
1201                                start_nanos,
1202                                end_nanos,
1203                                current_time_ns,
1204                                params,
1205                            ));
1206
1207                            if let Err(e) = sender.send(DataEvent::Response(response)) {
1208                                tracing::error!("Failed to send bars response: {e}");
1209                            }
1210                        }
1211                        Err(e) => {
1212                            tracing::error!(
1213                                "Failed to request candles for {symbol} without date range: {e:?}"
1214                            );
1215                        }
1216                    }
1217                    return;
1218                }
1219            };
1220
1221            // Calculate expected bars for the range.
1222            let total_secs = (range_end - range_start).num_seconds().max(0);
1223            let expected_bars = (total_secs / bar_secs).max(1) as u64;
1224
1225            tracing::debug!(
1226                "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1227                range_start,
1228                range_end,
1229                expected_bars
1230            );
1231
1232            let mut remaining = overall_limit.unwrap_or(u32::MAX);
1233
1234            // Determine chunk duration using max bars per request.
1235            let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1236            let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1237
1238            let mut chunk_start = range_start;
1239
1240            while chunk_start < range_end && remaining > 0 {
1241                let mut chunk_end = chunk_start + chunk_duration;
1242                if chunk_end > range_end {
1243                    chunk_end = range_end;
1244                }
1245
1246                let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1247
1248                tracing::debug!(
1249                    "request_bars chunk: {} -> {}, limit={}",
1250                    chunk_start,
1251                    chunk_end,
1252                    per_call_limit
1253                );
1254
1255                match http
1256                    .inner
1257                    .get_candles(
1258                        &symbol,
1259                        resolution_enum,
1260                        Some(per_call_limit),
1261                        Some(chunk_start),
1262                        Some(chunk_end),
1263                    )
1264                    .await
1265                {
1266                    Ok(candles_response) => {
1267                        let count = candles_response.candles.len() as u32;
1268
1269                        if count == 0 {
1270                            // No more data available; stop early.
1271                            break;
1272                        }
1273
1274                        // Convert candles to bars and accumulate.
1275                        for candle in &candles_response.candles {
1276                            match Self::candle_to_bar(
1277                                candle,
1278                                bar_type,
1279                                price_precision,
1280                                size_precision,
1281                                bar_secs,
1282                                clock,
1283                            ) {
1284                                Ok(bar) => all_bars.push(bar),
1285                                Err(e) => {
1286                                    tracing::warn!(
1287                                        "Failed to convert dYdX candle to bar for {}: {e}",
1288                                        instrument_id
1289                                    );
1290                                }
1291                            }
1292                        }
1293
1294                        if remaining <= count {
1295                            break;
1296                        } else {
1297                            remaining -= count;
1298                        }
1299                    }
1300                    Err(e) => {
1301                        tracing::error!(
1302                            "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1303                            chunk_start,
1304                            chunk_end
1305                        );
1306                        break;
1307                    }
1308                }
1309
1310                chunk_start += chunk_duration;
1311            }
1312
1313            tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1314
1315            // Filter incomplete bars: only return bars where ts_event < current_time_ns
1316            let current_time_ns = clock.get_time_ns();
1317            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1318
1319            tracing::debug!(
1320                "request_bars filtered to {} completed bars (current_time_ns={})",
1321                all_bars.len(),
1322                current_time_ns
1323            );
1324
1325            let response = DataResponse::Bars(BarsResponse::new(
1326                request_id,
1327                client_id,
1328                bar_type,
1329                all_bars,
1330                start_nanos,
1331                end_nanos,
1332                current_time_ns,
1333                params,
1334            ));
1335
1336            if let Err(e) = sender.send(DataEvent::Response(response)) {
1337                tracing::error!("Failed to send bars response: {e}");
1338            }
1339        });
1340
1341        Ok(())
1342    }
1343}
1344
1345/// Upserts an instrument into the shared cache.
1346fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1347    let symbol = Ustr::from(instrument.id().symbol.as_str());
1348    cache.insert(symbol, instrument);
1349}
1350
1351/// Convert optional DateTime to optional UnixNanos timestamp.
1352fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
1353    value
1354        .and_then(|dt| dt.timestamp_nanos_opt())
1355        .and_then(|nanos| u64::try_from(nanos).ok())
1356        .map(UnixNanos::from)
1357}
1358
1359impl DydxDataClient {
1360    /// Start a task to periodically refresh instruments.
1361    ///
1362    /// This task runs in the background and updates the instrument cache
1363    /// at the configured interval.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if a refresh task is already running.
1368    pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1369        let interval_secs = match self.config.instrument_refresh_interval_secs {
1370            Some(secs) if secs > 0 => secs,
1371            _ => {
1372                tracing::info!("Instrument refresh disabled (interval not configured)");
1373                return Ok(());
1374            }
1375        };
1376
1377        let interval = Duration::from_secs(interval_secs);
1378        let http_client = self.http_client.clone();
1379        let instruments_cache = self.instruments.clone();
1380        let cancellation_token = self.cancellation_token.clone();
1381
1382        tracing::info!(
1383            "Starting instrument refresh task (interval: {}s)",
1384            interval_secs
1385        );
1386
1387        let task = tokio::spawn(async move {
1388            let mut interval_timer = tokio::time::interval(interval);
1389            interval_timer.tick().await; // Skip first immediate tick
1390
1391            loop {
1392                tokio::select! {
1393                    _ = cancellation_token.cancelled() => {
1394                        tracing::info!("Instrument refresh task cancelled");
1395                        break;
1396                    }
1397                    _ = interval_timer.tick() => {
1398                        tracing::debug!("Refreshing instruments");
1399
1400                        match http_client.request_instruments(None, None, None).await {
1401                            Ok(instruments) => {
1402                                tracing::debug!("Refreshed {} instruments", instruments.len());
1403
1404                                // Update local cache with refreshed instruments
1405                                for instrument in instruments {
1406                                    upsert_instrument(&instruments_cache, instrument);
1407                                }
1408
1409                                // Also update HTTP client cache via cache_instruments method
1410                                let all_instruments: Vec<_> = instruments_cache
1411                                    .iter()
1412                                    .map(|entry| entry.value().clone())
1413                                    .collect();
1414                                http_client.cache_instruments(all_instruments);
1415                            }
1416                            Err(e) => {
1417                                tracing::error!("Failed to refresh instruments: {}", e);
1418                            }
1419                        }
1420                    }
1421                }
1422            }
1423        });
1424
1425        self.tasks.push(task);
1426        Ok(())
1427    }
1428
1429    /// Start a background task to periodically refresh orderbook snapshots.
1430    ///
1431    /// This prevents stale orderbooks from missed WebSocket messages due to:
1432    /// - Network issues or message drops
1433    /// - dYdX validator delays
1434    /// - WebSocket reconnection gaps
1435    ///
1436    /// The task fetches fresh snapshots via HTTP at the configured interval
1437    /// and applies them to the local orderbooks.
1438    fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1439        let interval_secs = match self.config.orderbook_refresh_interval_secs {
1440            Some(secs) if secs > 0 => secs,
1441            _ => {
1442                tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1443                return Ok(());
1444            }
1445        };
1446
1447        let interval = Duration::from_secs(interval_secs);
1448        let http_client = self.http_client.clone();
1449        let instruments = self.instruments.clone();
1450        let order_books = self.order_books.clone();
1451        let active_subs = self.active_orderbook_subs.clone();
1452        let cancellation_token = self.cancellation_token.clone();
1453        let data_sender = self.data_sender.clone();
1454
1455        tracing::info!(
1456            "Starting orderbook snapshot refresh task (interval: {}s)",
1457            interval_secs
1458        );
1459
1460        let task = tokio::spawn(async move {
1461            let mut interval_timer = tokio::time::interval(interval);
1462            interval_timer.tick().await; // Skip first immediate tick
1463
1464            loop {
1465                tokio::select! {
1466                    _ = cancellation_token.cancelled() => {
1467                        tracing::info!("Orderbook refresh task cancelled");
1468                        break;
1469                    }
1470                    _ = interval_timer.tick() => {
1471                        let active_instruments: Vec<InstrumentId> = active_subs
1472                            .iter()
1473                            .map(|entry| *entry.key())
1474                            .collect();
1475
1476                        if active_instruments.is_empty() {
1477                            tracing::debug!("No active orderbook subscriptions to refresh");
1478                            continue;
1479                        }
1480
1481                        tracing::debug!(
1482                            "Refreshing {} orderbook snapshots",
1483                            active_instruments.len()
1484                        );
1485
1486                        for instrument_id in active_instruments {
1487                            // Get instrument for parsing
1488                            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1489                                Some(inst) => inst.clone(),
1490                                None => {
1491                                    tracing::warn!(
1492                                        "Cannot refresh orderbook: no instrument for {}",
1493                                        instrument_id
1494                                    );
1495                                    continue;
1496                                }
1497                            };
1498
1499                            // Fetch snapshot via HTTP (strip -PERP suffix for dYdX API)
1500                            let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1501                            let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1502
1503                            let snapshot = match snapshot_result {
1504                                Ok(s) => s,
1505                                Err(e) => {
1506                                    tracing::error!(
1507                                        "Failed to fetch orderbook snapshot for {}: {}",
1508                                        instrument_id,
1509                                        e
1510                                    );
1511                                    continue;
1512                                }
1513                            };
1514
1515                            // Convert HTTP snapshot to OrderBookDeltas
1516                            let deltas_result = Self::parse_orderbook_snapshot(
1517                                instrument_id,
1518                                &snapshot,
1519                                &instrument,
1520                            );
1521
1522                            let deltas = match deltas_result {
1523                                Ok(d) => d,
1524                                Err(e) => {
1525                                    tracing::error!(
1526                                        "Failed to parse orderbook snapshot for {}: {}",
1527                                        instrument_id,
1528                                        e
1529                                    );
1530                                    continue;
1531                                }
1532                            };
1533
1534                            // Apply snapshot to local orderbook
1535                            if let Some(mut book) = order_books.get_mut(&instrument_id) {
1536                                if let Err(e) = book.apply_deltas(&deltas) {
1537                                    tracing::error!(
1538                                        "Failed to apply orderbook snapshot for {}: {}",
1539                                        instrument_id,
1540                                        e
1541                                    );
1542                                    continue;
1543                                }
1544
1545                                tracing::debug!(
1546                                    "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1547                                    instrument_id,
1548                                    book.best_bid_price(),
1549                                    book.best_ask_price()
1550                                );
1551                            }
1552
1553                            // Emit the snapshot deltas
1554                            let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1555                            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1556                                tracing::error!("Failed to emit orderbook snapshot: {}", e);
1557                            }
1558                        }
1559                    }
1560                }
1561            }
1562        });
1563
1564        self.tasks.push(task);
1565        Ok(())
1566    }
1567
1568    /// Parse HTTP orderbook snapshot into OrderBookDeltas.
1569    ///
1570    /// Converts the REST API orderbook format into Nautilus deltas with CLEAR + ADD actions.
1571    fn parse_orderbook_snapshot(
1572        instrument_id: InstrumentId,
1573        snapshot: &crate::http::models::OrderbookResponse,
1574        instrument: &InstrumentAny,
1575    ) -> anyhow::Result<OrderBookDeltas> {
1576        use nautilus_model::{
1577            data::{BookOrder, OrderBookDelta},
1578            enums::{BookAction, OrderSide, RecordFlag},
1579            instruments::Instrument,
1580            types::{Price, Quantity},
1581        };
1582
1583        let ts_init = get_atomic_clock_realtime().get_time_ns();
1584        let mut deltas = Vec::new();
1585
1586        // Add clear delta first
1587        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1588
1589        let price_precision = instrument.price_precision();
1590        let size_precision = instrument.size_precision();
1591
1592        let bids_len = snapshot.bids.len();
1593        let asks_len = snapshot.asks.len();
1594
1595        // Add bid levels
1596        for (idx, bid) in snapshot.bids.iter().enumerate() {
1597            let is_last = idx == bids_len - 1 && asks_len == 0;
1598            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1599
1600            let price = Price::from_decimal_dp(bid.price, price_precision)
1601                .context("failed to parse bid price")?;
1602            let size = Quantity::from_decimal_dp(bid.size, size_precision)
1603                .context("failed to parse bid size")?;
1604
1605            let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1606            deltas.push(OrderBookDelta::new(
1607                instrument_id,
1608                BookAction::Add,
1609                order,
1610                flags,
1611                0,
1612                ts_init,
1613                ts_init,
1614            ));
1615        }
1616
1617        // Add ask levels
1618        for (idx, ask) in snapshot.asks.iter().enumerate() {
1619            let is_last = idx == asks_len - 1;
1620            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1621
1622            let price = Price::from_decimal_dp(ask.price, price_precision)
1623                .context("failed to parse ask price")?;
1624            let size = Quantity::from_decimal_dp(ask.size, size_precision)
1625                .context("failed to parse ask size")?;
1626
1627            let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1628            deltas.push(OrderBookDelta::new(
1629                instrument_id,
1630                BookAction::Add,
1631                order,
1632                flags,
1633                0,
1634                ts_init,
1635                ts_init,
1636            ));
1637        }
1638
1639        Ok(OrderBookDeltas::new(instrument_id, deltas))
1640    }
1641
1642    /// Get a cached instrument by symbol.
1643    #[must_use]
1644    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1645        self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1646    }
1647
1648    /// Get all cached instruments.
1649    #[must_use]
1650    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1651        self.instruments.iter().map(|i| i.clone()).collect()
1652    }
1653
1654    fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1655        self.order_books
1656            .entry(instrument_id)
1657            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1658    }
1659
1660    /// Get BarType for a given WebSocket candle topic.
1661    #[must_use]
1662    pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1663        self.bar_type_mappings
1664            .get(topic)
1665            .map(|entry| *entry.value())
1666    }
1667
1668    /// Get all registered bar topics.
1669    #[must_use]
1670    pub fn get_bar_topics(&self) -> Vec<String> {
1671        self.bar_type_mappings
1672            .iter()
1673            .map(|entry| entry.key().clone())
1674            .collect()
1675    }
1676
1677    /// Convert a dYdX HTTP candle into a Nautilus [`Bar`].
1678    ///
1679    /// This mirrors the conversion logic used in other adapters (for example
1680    /// Hyperliquid), using the instrument price/size precision and mapping the
1681    /// candle start time to `ts_init` with `ts_event` at the end of the bar
1682    /// interval.
1683    fn candle_to_bar(
1684        candle: &crate::http::models::Candle,
1685        bar_type: BarType,
1686        price_precision: u8,
1687        size_precision: u8,
1688        bar_secs: i64,
1689        clock: &AtomicTime,
1690    ) -> anyhow::Result<Bar> {
1691        use anyhow::Context;
1692
1693        // Convert candle start time to UnixNanos (ts_init).
1694        let ts_init =
1695            datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1696
1697        // Treat ts_event as the end of the bar interval.
1698        let ts_event_ns = ts_init
1699            .as_u64()
1700            .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1701        let ts_event = UnixNanos::from(ts_event_ns);
1702
1703        let open = Price::from_decimal_dp(candle.open, price_precision)
1704            .context("failed to parse candle open price")?;
1705        let high = Price::from_decimal_dp(candle.high, price_precision)
1706            .context("failed to parse candle high price")?;
1707        let low = Price::from_decimal_dp(candle.low, price_precision)
1708            .context("failed to parse candle low price")?;
1709        let close = Price::from_decimal_dp(candle.close, price_precision)
1710            .context("failed to parse candle close price")?;
1711
1712        // Use base token volume as bar volume.
1713        let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1714            .context("failed to parse candle base_token_volume")?;
1715
1716        Ok(Bar::new(
1717            bar_type, open, high, low, close, volume, ts_event, ts_init,
1718        ))
1719    }
1720
1721    fn handle_ws_message(
1722        message: crate::websocket::enums::NautilusWsMessage,
1723        ctx: &WsMessageContext,
1724    ) {
1725        match message {
1726            crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1727                Self::handle_data_message(payloads, ctx.data_sender, ctx.incomplete_bars);
1728            }
1729            crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1730                Self::handle_deltas_message(
1731                    *deltas,
1732                    ctx.data_sender,
1733                    ctx.order_books,
1734                    ctx.last_quotes,
1735                    ctx.instruments,
1736                );
1737            }
1738            crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1739                Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1740            }
1741            crate::websocket::enums::NautilusWsMessage::Error(err) => {
1742                tracing::error!("dYdX WS error: {err}");
1743            }
1744            crate::websocket::enums::NautilusWsMessage::Reconnected => {
1745                tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1746
1747                // Re-subscribe to all active subscriptions after WebSocket reconnection
1748                if let Some(ws) = ctx.ws_client {
1749                    let total_subs = ctx.active_orderbook_subs.len()
1750                        + ctx.active_trade_subs.len()
1751                        + ctx.active_bar_subs.len();
1752
1753                    if total_subs == 0 {
1754                        tracing::debug!("No active subscriptions to restore");
1755                        return;
1756                    }
1757
1758                    tracing::info!(
1759                        "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1760                        total_subs,
1761                        ctx.active_orderbook_subs.len(),
1762                        ctx.active_trade_subs.len(),
1763                        ctx.active_bar_subs.len()
1764                    );
1765
1766                    // Re-subscribe to orderbook channels
1767                    for entry in ctx.active_orderbook_subs.iter() {
1768                        let instrument_id = *entry.key();
1769                        let ws_clone = ws.clone();
1770                        tokio::spawn(async move {
1771                            if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1772                                tracing::error!(
1773                                    "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1774                                );
1775                            } else {
1776                                tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1777                            }
1778                        });
1779                    }
1780
1781                    // Re-subscribe to trade channels
1782                    for entry in ctx.active_trade_subs.iter() {
1783                        let instrument_id = *entry.key();
1784                        let ws_clone = ws.clone();
1785                        tokio::spawn(async move {
1786                            if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1787                                tracing::error!(
1788                                    "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1789                                );
1790                            } else {
1791                                tracing::debug!("Re-subscribed to trades for {instrument_id}");
1792                            }
1793                        });
1794                    }
1795
1796                    // Re-subscribe to candle/bar channels
1797                    for entry in ctx.active_bar_subs.iter() {
1798                        let (instrument_id, resolution) = entry.key();
1799                        let instrument_id = *instrument_id;
1800                        let resolution = resolution.clone();
1801                        let bar_type = *entry.value();
1802                        let ws_clone = ws.clone();
1803
1804                        // Re-register bar type with handler
1805                        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1806                        let topic = format!("{ticker}/{resolution}");
1807                        if let Err(e) = ws.send_command(
1808                            crate::websocket::handler::HandlerCommand::RegisterBarType {
1809                                topic,
1810                                bar_type,
1811                            },
1812                        ) {
1813                            tracing::warn!(
1814                                "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1815                            );
1816                        }
1817
1818                        tokio::spawn(async move {
1819                            if let Err(e) =
1820                                ws_clone.subscribe_candles(instrument_id, &resolution).await
1821                            {
1822                                tracing::error!(
1823                                    "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1824                                );
1825                            } else {
1826                                tracing::debug!(
1827                                    "Re-subscribed to candles for {instrument_id} ({resolution})"
1828                                );
1829                            }
1830                        });
1831                    }
1832
1833                    tracing::info!("Completed re-subscription requests after reconnection");
1834                } else {
1835                    tracing::warn!("WebSocket client not available for re-subscription");
1836                }
1837            }
1838            crate::websocket::enums::NautilusWsMessage::Order(_)
1839            | crate::websocket::enums::NautilusWsMessage::Fill(_)
1840            | crate::websocket::enums::NautilusWsMessage::Position(_)
1841            | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1842            | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1843            | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1844                tracing::debug!(
1845                    "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1846                );
1847            }
1848        }
1849    }
1850
1851    fn handle_data_message(
1852        payloads: Vec<NautilusData>,
1853        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1854        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1855    ) {
1856        for data in payloads {
1857            // Filter bars through incomplete bars cache
1858            if let NautilusData::Bar(bar) = data {
1859                Self::handle_bar_message(bar, data_sender, incomplete_bars);
1860            } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1861                tracing::error!("Failed to emit data event: {e}");
1862            }
1863        }
1864    }
1865
1866    /// Handles bar messages by tracking incomplete bars and only emitting completed ones.
1867    ///
1868    /// WebSocket candle updates arrive continuously. This method:
1869    /// - Caches bars where ts_event > current_time (incomplete)
1870    /// - Emits bars where ts_event <= current_time (complete)
1871    /// - Updates cached incomplete bars with latest data
1872    fn handle_bar_message(
1873        bar: Bar,
1874        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1875        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1876    ) {
1877        let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1878        let bar_type = bar.bar_type;
1879
1880        if bar.ts_event <= current_time_ns {
1881            // Bar is complete - emit it and remove from incomplete cache
1882            incomplete_bars.remove(&bar_type);
1883            if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1884                tracing::error!("Failed to emit completed bar: {e}");
1885            }
1886        } else {
1887            // Bar is incomplete - cache it (updates existing entry)
1888            tracing::trace!(
1889                "Caching incomplete bar for {} (ts_event={}, current={})",
1890                bar_type,
1891                bar.ts_event,
1892                current_time_ns
1893            );
1894            incomplete_bars.insert(bar_type, bar);
1895        }
1896    }
1897
1898    /// Resolves a crossed order book by generating synthetic deltas to uncross it.
1899    ///
1900    /// dYdX order books can become crossed due to:
1901    /// - Validator delays in order acknowledgment across the network
1902    /// - Missed or delayed WebSocket messages from the venue
1903    ///
1904    /// This function detects when bid_price >= ask_price and iteratively removes
1905    /// the smaller side while adjusting the larger side until the book is uncrossed.
1906    ///
1907    /// # Algorithm
1908    ///
1909    /// For each crossed level:
1910    /// - If bid_size > ask_size: DELETE ask, UPDATE bid (reduce by ask_size)
1911    /// - If bid_size < ask_size: DELETE bid, UPDATE ask (reduce by bid_size)
1912    /// - If bid_size == ask_size: DELETE both bid and ask
1913    ///
1914    /// The algorithm continues until no more crosses exist or the book is empty.
1915    fn resolve_crossed_order_book(
1916        book: &mut OrderBook,
1917        venue_deltas: OrderBookDeltas,
1918        instrument: &InstrumentAny,
1919    ) -> anyhow::Result<OrderBookDeltas> {
1920        let instrument_id = venue_deltas.instrument_id;
1921        let ts_init = venue_deltas.ts_init;
1922        let mut all_deltas = venue_deltas.deltas.clone();
1923
1924        // Apply the original venue deltas first
1925        book.apply_deltas(&venue_deltas)?;
1926
1927        // Check if orderbook is crossed
1928        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1929            (book.best_bid_price(), book.best_ask_price())
1930        {
1931            bid_price >= ask_price
1932        } else {
1933            false
1934        };
1935
1936        // Iteratively uncross the orderbook
1937        while is_crossed {
1938            tracing::debug!(
1939                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1940                instrument_id,
1941                book.best_bid_price(),
1942                book.best_ask_price()
1943            );
1944
1945            let bid_price = match book.best_bid_price() {
1946                Some(p) => p,
1947                None => break,
1948            };
1949            let ask_price = match book.best_ask_price() {
1950                Some(p) => p,
1951                None => break,
1952            };
1953            let bid_size = match book.best_bid_size() {
1954                Some(s) => s,
1955                None => break,
1956            };
1957            let ask_size = match book.best_ask_size() {
1958                Some(s) => s,
1959                None => break,
1960            };
1961
1962            let mut temp_deltas = Vec::new();
1963
1964            if bid_size > ask_size {
1965                // Remove ask level, reduce bid level
1966                let new_bid_size = Quantity::new(
1967                    bid_size.as_f64() - ask_size.as_f64(),
1968                    instrument.size_precision(),
1969                );
1970                temp_deltas.push(OrderBookDelta::new(
1971                    instrument_id,
1972                    BookAction::Update,
1973                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1974                    0,
1975                    0,
1976                    ts_init,
1977                    ts_init,
1978                ));
1979                temp_deltas.push(OrderBookDelta::new(
1980                    instrument_id,
1981                    BookAction::Delete,
1982                    BookOrder::new(
1983                        OrderSide::Sell,
1984                        ask_price,
1985                        Quantity::new(0.0, instrument.size_precision()),
1986                        0,
1987                    ),
1988                    0,
1989                    0,
1990                    ts_init,
1991                    ts_init,
1992                ));
1993            } else if bid_size < ask_size {
1994                // Remove bid level, reduce ask level
1995                let new_ask_size = Quantity::new(
1996                    ask_size.as_f64() - bid_size.as_f64(),
1997                    instrument.size_precision(),
1998                );
1999                temp_deltas.push(OrderBookDelta::new(
2000                    instrument_id,
2001                    BookAction::Update,
2002                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
2003                    0,
2004                    0,
2005                    ts_init,
2006                    ts_init,
2007                ));
2008                temp_deltas.push(OrderBookDelta::new(
2009                    instrument_id,
2010                    BookAction::Delete,
2011                    BookOrder::new(
2012                        OrderSide::Buy,
2013                        bid_price,
2014                        Quantity::new(0.0, instrument.size_precision()),
2015                        0,
2016                    ),
2017                    0,
2018                    0,
2019                    ts_init,
2020                    ts_init,
2021                ));
2022            } else {
2023                // Equal sizes: remove both levels
2024                temp_deltas.push(OrderBookDelta::new(
2025                    instrument_id,
2026                    BookAction::Delete,
2027                    BookOrder::new(
2028                        OrderSide::Buy,
2029                        bid_price,
2030                        Quantity::new(0.0, instrument.size_precision()),
2031                        0,
2032                    ),
2033                    0,
2034                    0,
2035                    ts_init,
2036                    ts_init,
2037                ));
2038                temp_deltas.push(OrderBookDelta::new(
2039                    instrument_id,
2040                    BookAction::Delete,
2041                    BookOrder::new(
2042                        OrderSide::Sell,
2043                        ask_price,
2044                        Quantity::new(0.0, instrument.size_precision()),
2045                        0,
2046                    ),
2047                    0,
2048                    0,
2049                    ts_init,
2050                    ts_init,
2051                ));
2052            }
2053
2054            // Apply temporary deltas to the book
2055            let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2056            book.apply_deltas(&temp_deltas_obj)?;
2057            all_deltas.extend(temp_deltas);
2058
2059            // Check if still crossed
2060            is_crossed = if let (Some(bid_price), Some(ask_price)) =
2061                (book.best_bid_price(), book.best_ask_price())
2062            {
2063                bid_price >= ask_price
2064            } else {
2065                false
2066            };
2067        }
2068
2069        // Set F_LAST flag on the final delta
2070        if let Some(last_delta) = all_deltas.last_mut() {
2071            last_delta.flags = RecordFlag::F_LAST as u8;
2072        }
2073
2074        Ok(OrderBookDeltas::new(instrument_id, all_deltas))
2075    }
2076
2077    fn handle_deltas_message(
2078        deltas: OrderBookDeltas,
2079        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2080        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2081        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2082        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2083    ) {
2084        let instrument_id = deltas.instrument_id;
2085
2086        // Get instrument for crossed orderbook resolution
2087        let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2088            Some(inst) => inst.clone(),
2089            None => {
2090                tracing::error!(
2091                    "Cannot resolve crossed order book: no instrument for {instrument_id}"
2092                );
2093                // Still emit the raw deltas even without instrument
2094                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2095                    OrderBookDeltas_API::new(deltas),
2096                ))) {
2097                    tracing::error!("Failed to emit order book deltas: {e}");
2098                }
2099                return;
2100            }
2101        };
2102
2103        // Get or create order book
2104        let mut book = order_books
2105            .entry(instrument_id)
2106            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2107
2108        // Resolve crossed orderbook (applies deltas internally)
2109        let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2110        {
2111            Ok(d) => d,
2112            Err(e) => {
2113                tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2114                return;
2115            }
2116        };
2117
2118        // Generate QuoteTick from updated top-of-book
2119        // Edge case: If orderbook is empty after deltas, fall back to last quote
2120        let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2121            (book.best_bid_price(), book.best_ask_price())
2122            && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2123        {
2124            Some(QuoteTick::new(
2125                instrument_id,
2126                bid_price,
2127                ask_price,
2128                bid_size,
2129                ask_size,
2130                resolved_deltas.ts_event,
2131                resolved_deltas.ts_init,
2132            ))
2133        } else {
2134            // Edge case: Empty orderbook levels - use last quote as fallback
2135            if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2136                tracing::debug!(
2137                    "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2138                );
2139                last_quotes.get(&instrument_id).map(|q| *q)
2140            } else {
2141                None
2142            }
2143        };
2144
2145        if let Some(quote) = quote_opt {
2146            // Only emit when top-of-book changes
2147            let emit_quote =
2148                !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2149
2150            if emit_quote {
2151                last_quotes.insert(instrument_id, quote);
2152                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2153                    tracing::error!("Failed to emit quote tick: {e}");
2154                }
2155            }
2156        } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2157            // Partial orderbook (only one side) - log but don't emit
2158            tracing::debug!(
2159                "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2160                book.best_bid_price(),
2161                book.best_ask_price()
2162            );
2163        }
2164
2165        // Emit the resolved order book deltas
2166        let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2167        if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2168            tracing::error!("Failed to emit order book deltas event: {e}");
2169        }
2170    }
2171
2172    fn handle_oracle_prices(
2173        oracle_prices: std::collections::HashMap<
2174            String,
2175            crate::websocket::messages::DydxOraclePriceMarket,
2176        >,
2177        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2178        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2179    ) {
2180        let ts_init = get_atomic_clock_realtime().get_time_ns();
2181
2182        for (symbol_str, oracle_market) in oracle_prices {
2183            let symbol = Ustr::from(&symbol_str);
2184
2185            // Get instrument to access instrument_id
2186            let Some(instrument) = instruments.get(&symbol) else {
2187                tracing::debug!(
2188                    symbol = %symbol,
2189                    "Received oracle price for unknown instrument (not cached yet)"
2190                );
2191                continue;
2192            };
2193
2194            let instrument_id = instrument.id();
2195
2196            // Parse oracle price string to Price
2197            let oracle_price_str = &oracle_market.oracle_price;
2198            let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2199                tracing::error!(
2200                    symbol = %symbol,
2201                    price_str = %oracle_price_str,
2202                    "Failed to parse oracle price as f64"
2203                );
2204                continue;
2205            };
2206
2207            let price_precision = instrument.price_precision();
2208            let oracle_price = Price::from_raw(
2209                (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2210                price_precision,
2211            );
2212
2213            let oracle_price_event = DydxOraclePrice::new(
2214                instrument_id,
2215                oracle_price,
2216                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2217                ts_init,
2218            );
2219
2220            tracing::debug!(
2221                instrument_id = %instrument_id,
2222                oracle_price = %oracle_price,
2223                "Received dYdX oracle price: {oracle_price_event:?}"
2224            );
2225
2226            let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2227                instrument_id,
2228                oracle_price,
2229                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2230                ts_init,
2231            ));
2232
2233            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2234                tracing::error!("Failed to emit oracle price: {e}");
2235            }
2236        }
2237    }
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242    use std::{collections::HashMap, net::SocketAddr};
2243
2244    use axum::{
2245        Router,
2246        extract::{Path, Query, State},
2247        response::Json,
2248        routing::get,
2249    };
2250    use indexmap::IndexMap;
2251    use nautilus_common::{
2252        live::runner::set_data_event_sender,
2253        messages::{DataEvent, data::DataResponse},
2254    };
2255    use nautilus_core::UUID4;
2256    use nautilus_model::{
2257        data::{
2258            BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2259            TradeTick, order::BookOrder,
2260        },
2261        enums::{
2262            AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2263            PriceType,
2264        },
2265        identifiers::{ClientId, InstrumentId, Symbol, Venue},
2266        instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2267        orderbook::OrderBook,
2268        types::{Currency, Price, Quantity},
2269    };
2270    use rstest::rstest;
2271    use rust_decimal::Decimal;
2272    use rust_decimal_macros::dec;
2273    use tokio::net::TcpListener;
2274
2275    use super::*;
2276    use crate::http::models::{Candle, CandlesResponse};
2277
2278    fn setup_test_env() {
2279        // Initialize data event sender for tests
2280        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2281        set_data_event_sender(sender);
2282    }
2283
2284    #[rstest]
2285    fn test_new_data_client() {
2286        setup_test_env();
2287
2288        let client_id = ClientId::from("DYDX-001");
2289        let config = DydxDataClientConfig::default();
2290        let http_client = DydxHttpClient::default();
2291
2292        let client = DydxDataClient::new(client_id, config, http_client, None);
2293        assert!(client.is_ok());
2294
2295        let client = client.unwrap();
2296        assert_eq!(client.client_id(), client_id);
2297        assert_eq!(client.venue(), *DYDX_VENUE);
2298        assert!(!client.is_connected());
2299    }
2300
2301    #[tokio::test]
2302    async fn test_data_client_lifecycle() {
2303        setup_test_env();
2304
2305        let client_id = ClientId::from("DYDX-001");
2306        let config = DydxDataClientConfig::default();
2307        let http_client = DydxHttpClient::default();
2308
2309        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2310
2311        // Test start
2312        assert!(client.start().is_ok());
2313
2314        // Test stop
2315        assert!(client.stop().is_ok());
2316        assert!(!client.is_connected());
2317
2318        // Test reset
2319        assert!(client.reset().is_ok());
2320
2321        // Test dispose
2322        assert!(client.dispose().is_ok());
2323    }
2324
2325    #[rstest]
2326    fn test_subscribe_unsubscribe_instruments_noop() {
2327        setup_test_env();
2328
2329        let client_id = ClientId::from("DYDX-TEST");
2330        let config = DydxDataClientConfig::default();
2331        let http_client = DydxHttpClient::default();
2332
2333        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2334
2335        let venue = *DYDX_VENUE;
2336        let command_id = UUID4::new();
2337        let ts_init = get_atomic_clock_realtime().get_time_ns();
2338
2339        let subscribe = SubscribeInstruments {
2340            client_id: Some(client_id),
2341            venue,
2342            command_id,
2343            ts_init,
2344            params: None,
2345        };
2346        let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2347
2348        // No-op methods should succeed even without a WebSocket client.
2349        assert!(client.subscribe_instruments(&subscribe).is_ok());
2350        assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2351    }
2352
2353    #[rstest]
2354    fn test_bar_type_mappings_registration() {
2355        setup_test_env();
2356
2357        let client_id = ClientId::from("DYDX-TEST");
2358        let config = DydxDataClientConfig::default();
2359        let http_client = DydxHttpClient::default();
2360
2361        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2362
2363        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2364        let spec = BarSpecification {
2365            step: std::num::NonZeroUsize::new(1).unwrap(),
2366            aggregation: BarAggregation::Minute,
2367            price_type: PriceType::Last,
2368        };
2369        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2370
2371        // Initially no topics registered
2372        assert!(client.get_bar_topics().is_empty());
2373        assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2374
2375        // Register topic
2376        client
2377            .bar_type_mappings
2378            .insert("BTC-USD/1MIN".to_string(), bar_type);
2379
2380        // Verify registration
2381        assert_eq!(client.get_bar_topics().len(), 1);
2382        assert!(
2383            client
2384                .get_bar_topics()
2385                .contains(&"BTC-USD/1MIN".to_string())
2386        );
2387        assert_eq!(
2388            client.get_bar_type_for_topic("BTC-USD/1MIN"),
2389            Some(bar_type)
2390        );
2391
2392        // Register another topic
2393        let spec_5min = BarSpecification {
2394            step: std::num::NonZeroUsize::new(5).unwrap(),
2395            aggregation: BarAggregation::Minute,
2396            price_type: PriceType::Last,
2397        };
2398        let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2399        client
2400            .bar_type_mappings
2401            .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2402
2403        // Verify multiple topics
2404        assert_eq!(client.get_bar_topics().len(), 2);
2405        assert_eq!(
2406            client.get_bar_type_for_topic("BTC-USD/5MINS"),
2407            Some(bar_type_5min)
2408        );
2409    }
2410
2411    #[rstest]
2412    fn test_bar_type_mappings_unregistration() {
2413        setup_test_env();
2414
2415        let client_id = ClientId::from("DYDX-TEST");
2416        let config = DydxDataClientConfig::default();
2417        let http_client = DydxHttpClient::default();
2418
2419        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2420
2421        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2422        let spec = BarSpecification {
2423            step: std::num::NonZeroUsize::new(1).unwrap(),
2424            aggregation: BarAggregation::Hour,
2425            price_type: PriceType::Last,
2426        };
2427        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2428
2429        // Register topic
2430        client
2431            .bar_type_mappings
2432            .insert("ETH-USD/1HOUR".to_string(), bar_type);
2433        assert_eq!(client.get_bar_topics().len(), 1);
2434
2435        // Unregister topic
2436        client.bar_type_mappings.remove("ETH-USD/1HOUR");
2437
2438        // Verify unregistration
2439        assert!(client.get_bar_topics().is_empty());
2440        assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2441    }
2442
2443    #[rstest]
2444    fn test_bar_type_mappings_lookup_nonexistent() {
2445        setup_test_env();
2446
2447        let client_id = ClientId::from("DYDX-TEST");
2448        let config = DydxDataClientConfig::default();
2449        let http_client = DydxHttpClient::default();
2450
2451        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2452
2453        // Lookup non-existent topic
2454        assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2455        assert!(client.get_bar_topics().is_empty());
2456    }
2457
2458    #[tokio::test]
2459    async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2460        setup_test_env();
2461
2462        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2463        let instruments = Arc::new(DashMap::new());
2464        let order_books = Arc::new(DashMap::new());
2465        let last_quotes = Arc::new(DashMap::new());
2466        let ws_client: Option<DydxWebSocketClient> = None;
2467        let active_orderbook_subs = Arc::new(DashMap::new());
2468        let active_trade_subs = Arc::new(DashMap::new());
2469        let active_bar_subs = Arc::new(DashMap::new());
2470
2471        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2472        let bar_ts = get_atomic_clock_realtime().get_time_ns();
2473
2474        // Add a test instrument to the cache (required for crossed book resolution)
2475        use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2476        let symbol = Symbol::from("BTC-USD-PERP");
2477        let instrument = CryptoPerpetual::new(
2478            instrument_id,
2479            symbol,
2480            Currency::BTC(),
2481            Currency::USD(),
2482            Currency::USD(),
2483            false,
2484            2,
2485            4,
2486            Price::from("0.01"),
2487            Quantity::from("0.0001"),
2488            None,
2489            None,
2490            None,
2491            None,
2492            None,
2493            None,
2494            None,
2495            None,
2496            None,
2497            None,
2498            None,
2499            None,
2500            bar_ts,
2501            bar_ts,
2502        );
2503        instruments.insert(
2504            Ustr::from("BTC-USD-PERP"),
2505            InstrumentAny::CryptoPerpetual(instrument),
2506        );
2507
2508        let price = Price::from("100.00");
2509        let size = Quantity::from("1.0");
2510
2511        // Create both bid and ask deltas to generate a quote
2512        let bid_delta = OrderBookDelta::new(
2513            instrument_id,
2514            BookAction::Add,
2515            BookOrder::new(OrderSide::Buy, price, size, 1),
2516            0,
2517            1,
2518            bar_ts,
2519            bar_ts,
2520        );
2521        let ask_delta = OrderBookDelta::new(
2522            instrument_id,
2523            BookAction::Add,
2524            BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2525            0,
2526            1,
2527            bar_ts,
2528            bar_ts,
2529        );
2530        let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2531
2532        let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2533
2534        let incomplete_bars = Arc::new(DashMap::new());
2535        let ctx = WsMessageContext {
2536            data_sender: &sender,
2537            instruments: &instruments,
2538            order_books: &order_books,
2539            last_quotes: &last_quotes,
2540            ws_client: &ws_client,
2541            active_orderbook_subs: &active_orderbook_subs,
2542            active_trade_subs: &active_trade_subs,
2543            active_bar_subs: &active_bar_subs,
2544            incomplete_bars: &incomplete_bars,
2545        };
2546        DydxDataClient::handle_ws_message(message, &ctx);
2547
2548        // Ensure order book was created and top-of-book quote cached.
2549        assert!(order_books.get(&instrument_id).is_some());
2550        assert!(last_quotes.get(&instrument_id).is_some());
2551
2552        // Ensure a quote and deltas Data events were emitted.
2553        let mut saw_quote = false;
2554        let mut saw_deltas = false;
2555
2556        while let Ok(event) = rx.try_recv() {
2557            if let DataEvent::Data(data) = event {
2558                match data {
2559                    NautilusData::Quote(_) => saw_quote = true,
2560                    NautilusData::Deltas(_) => saw_deltas = true,
2561                    _ => {}
2562                }
2563            }
2564        }
2565
2566        assert!(saw_quote);
2567        assert!(saw_deltas);
2568    }
2569
2570    #[rstest]
2571    fn test_handle_ws_message_error_does_not_panic() {
2572        // Ensure malformed/error WebSocket messages are logged and ignored
2573        // without panicking or affecting client state.
2574        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2575        let instruments = Arc::new(DashMap::new());
2576        let order_books = Arc::new(DashMap::new());
2577        let last_quotes = Arc::new(DashMap::new());
2578        let ws_client: Option<DydxWebSocketClient> = None;
2579        let active_orderbook_subs = Arc::new(DashMap::new());
2580        let active_trade_subs = Arc::new(DashMap::new());
2581        let active_bar_subs = Arc::new(DashMap::new());
2582        let incomplete_bars = Arc::new(DashMap::new());
2583
2584        let ctx = WsMessageContext {
2585            data_sender: &sender,
2586            instruments: &instruments,
2587            order_books: &order_books,
2588            last_quotes: &last_quotes,
2589            ws_client: &ws_client,
2590            active_orderbook_subs: &active_orderbook_subs,
2591            active_trade_subs: &active_trade_subs,
2592            active_bar_subs: &active_bar_subs,
2593            incomplete_bars: &incomplete_bars,
2594        };
2595
2596        let err = crate::websocket::error::DydxWebSocketError::from_message(
2597            "malformed WebSocket payload".to_string(),
2598        );
2599
2600        DydxDataClient::handle_ws_message(
2601            crate::websocket::enums::NautilusWsMessage::Error(err),
2602            &ctx,
2603        );
2604    }
2605
2606    #[tokio::test]
2607    async fn test_request_bars_partitioning_math_does_not_panic() {
2608        setup_test_env();
2609
2610        let client_id = ClientId::from("DYDX-BARS");
2611        let config = DydxDataClientConfig::default();
2612        let http_client = DydxHttpClient::default();
2613
2614        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2615
2616        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2617        let spec = BarSpecification {
2618            step: std::num::NonZeroUsize::new(1).unwrap(),
2619            aggregation: BarAggregation::Minute,
2620            price_type: PriceType::Last,
2621        };
2622        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2623
2624        let now = Utc::now();
2625        let start = Some(now - chrono::Duration::hours(10));
2626        let end = Some(now);
2627
2628        let request = RequestBars::new(
2629            bar_type,
2630            start,
2631            end,
2632            None,
2633            Some(client_id),
2634            UUID4::new(),
2635            get_atomic_clock_realtime().get_time_ns(),
2636            None,
2637        );
2638
2639        // We only verify that the partitioning logic executes without panicking;
2640        // HTTP calls are allowed to fail and are handled internally.
2641        assert!(client.request_bars(&request).is_ok());
2642    }
2643
2644    #[tokio::test]
2645    async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2646        setup_test_env();
2647
2648        // Prepare a simple candles response served by a local Axum HTTP server.
2649        let now = Utc::now();
2650        let candle = crate::http::models::Candle {
2651            started_at: now - chrono::Duration::minutes(1),
2652            ticker: "BTC-USD".to_string(),
2653            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2654            open: dec!(100.0),
2655            high: dec!(101.0),
2656            low: dec!(99.0),
2657            close: dec!(100.5),
2658            base_token_volume: dec!(1.0),
2659            usd_volume: dec!(100.0),
2660            trades: 10,
2661            starting_open_interest: dec!(1000.0),
2662        };
2663        let candles_response = crate::http::models::CandlesResponse {
2664            candles: vec![candle],
2665        };
2666        let state = CandlesTestState {
2667            response: Arc::new(candles_response),
2668        };
2669        let addr = start_candles_test_server(state).await;
2670        let base_url = format!("http://{addr}");
2671
2672        let client_id = ClientId::from("DYDX-BARS-MONTHS");
2673        let config = DydxDataClientConfig {
2674            base_url_http: Some(base_url),
2675            is_testnet: true,
2676            ..Default::default()
2677        };
2678
2679        let http_client = DydxHttpClient::new(
2680            config.base_url_http.clone(),
2681            config.http_timeout_secs,
2682            config.http_proxy_url.clone(),
2683            config.is_testnet,
2684            None,
2685        )
2686        .unwrap();
2687
2688        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2689
2690        // Seed instrument cache so request_bars can resolve precision.
2691        let instrument = create_test_instrument_any();
2692        let instrument_id = instrument.id();
2693        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2694        client.instruments.insert(symbol_key, instrument);
2695
2696        let spec = BarSpecification {
2697            step: std::num::NonZeroUsize::new(1).unwrap(),
2698            aggregation: BarAggregation::Minute,
2699            price_type: PriceType::Last,
2700        };
2701        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2702
2703        // Use a date range spanning multiple months to exercise partitioning math.
2704        let start = Some(now - chrono::Duration::days(90));
2705        let end = Some(now);
2706
2707        // Limit the total number of bars so the test completes quickly.
2708        let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2709
2710        let request = RequestBars::new(
2711            bar_type,
2712            start,
2713            end,
2714            limit,
2715            Some(client_id),
2716            UUID4::new(),
2717            get_atomic_clock_realtime().get_time_ns(),
2718            None,
2719        );
2720
2721        assert!(client.request_bars(&request).is_ok());
2722    }
2723
2724    #[derive(Clone)]
2725    struct OrderbookTestState {
2726        snapshot: Arc<crate::http::models::OrderbookResponse>,
2727    }
2728
2729    #[derive(Clone)]
2730    struct TradesTestState {
2731        response: Arc<crate::http::models::TradesResponse>,
2732        last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2733        last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2734    }
2735
2736    #[derive(Clone)]
2737    struct CandlesTestState {
2738        response: Arc<crate::http::models::CandlesResponse>,
2739    }
2740
2741    async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2742        async fn handle_orderbook(
2743            Path(_ticker): Path<String>,
2744            State(state): State<OrderbookTestState>,
2745        ) -> Json<crate::http::models::OrderbookResponse> {
2746            Json((*state.snapshot).clone())
2747        }
2748
2749        let router = Router::new().route(
2750            "/v4/orderbooks/perpetualMarket/{ticker}",
2751            get(handle_orderbook).with_state(state),
2752        );
2753
2754        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2755        let addr = listener.local_addr().unwrap();
2756
2757        tokio::spawn(async move {
2758            axum::serve(listener, router.into_make_service())
2759                .await
2760                .unwrap();
2761        });
2762
2763        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2764        addr
2765    }
2766
2767    async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2768        async fn handle_trades(
2769            Path(ticker): Path<String>,
2770            Query(params): Query<HashMap<String, String>>,
2771            State(state): State<TradesTestState>,
2772        ) -> Json<crate::http::models::TradesResponse> {
2773            {
2774                let mut last_ticker = state.last_ticker.lock().await;
2775                *last_ticker = Some(ticker);
2776            }
2777
2778            let limit = params
2779                .get("limit")
2780                .and_then(|value| value.parse::<u32>().ok());
2781            {
2782                let mut last_limit = state.last_limit.lock().await;
2783                *last_limit = Some(limit);
2784            }
2785
2786            Json((*state.response).clone())
2787        }
2788
2789        let router = Router::new().route(
2790            "/v4/trades/perpetualMarket/{ticker}",
2791            get(handle_trades).with_state(state),
2792        );
2793
2794        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2795        let addr = listener.local_addr().unwrap();
2796
2797        tokio::spawn(async move {
2798            axum::serve(listener, router.into_make_service())
2799                .await
2800                .unwrap();
2801        });
2802
2803        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2804        addr
2805    }
2806
2807    async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2808        async fn handle_candles(
2809            Path(_ticker): Path<String>,
2810            Query(_params): Query<HashMap<String, String>>,
2811            State(state): State<CandlesTestState>,
2812        ) -> Json<crate::http::models::CandlesResponse> {
2813            Json((*state.response).clone())
2814        }
2815
2816        let router = Router::new().route(
2817            "/v4/candles/perpetualMarkets/{ticker}",
2818            get(handle_candles).with_state(state),
2819        );
2820
2821        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2822        let addr = listener.local_addr().unwrap();
2823
2824        tokio::spawn(async move {
2825            axum::serve(listener, router.into_make_service())
2826                .await
2827                .unwrap();
2828        });
2829
2830        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2831        addr
2832    }
2833
2834    fn create_test_instrument_any() -> InstrumentAny {
2835        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2836
2837        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2838            instrument_id,
2839            instrument_id.symbol,
2840            Currency::BTC(),
2841            Currency::USD(),
2842            Currency::USD(),
2843            false,
2844            2,                                // price_precision
2845            8,                                // size_precision
2846            Price::new(0.01, 2),              // price_increment
2847            Quantity::new(0.001, 8),          // size_increment
2848            Some(Quantity::new(1.0, 0)),      // multiplier
2849            Some(Quantity::new(0.001, 8)),    // lot_size
2850            Some(Quantity::new(100000.0, 8)), // max_quantity
2851            Some(Quantity::new(0.001, 8)),    // min_quantity
2852            None,                             // max_notional
2853            None,                             // min_notional
2854            Some(Price::new(1000000.0, 2)),   // max_price
2855            Some(Price::new(0.01, 2)),        // min_price
2856            Some(dec!(0.05)),                 // margin_init
2857            Some(dec!(0.03)),                 // margin_maint
2858            Some(dec!(0.0002)),               // maker_fee
2859            Some(dec!(0.0005)),               // taker_fee
2860            UnixNanos::default(),             // ts_event
2861            UnixNanos::default(),             // ts_init
2862        ))
2863    }
2864
2865    // ------------------------------------------------------------------------
2866    // Precision & bar conversion tests
2867    // ------------------------------------------------------------------------
2868
2869    #[tokio::test]
2870    async fn test_candle_to_bar_price_size_edge_cases() {
2871        setup_test_env();
2872
2873        let clock = get_atomic_clock_realtime();
2874        let now = Utc::now();
2875
2876        // Very large prices and sizes (edge cases).
2877        let candle = Candle {
2878            started_at: now,
2879            ticker: "BTC-USD".to_string(),
2880            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2881            open: dec!(123456789.123456),
2882            high: dec!(987654321.987654),  // high is max
2883            low: dec!(123456.789),         // low is min
2884            close: dec!(223456789.123456), // close between low and high
2885            base_token_volume: dec!(0.00000001),
2886            usd_volume: dec!(1234500.0),
2887            trades: 42,
2888            starting_open_interest: dec!(1000.0),
2889        };
2890
2891        let instrument = create_test_instrument_any();
2892        let instrument_id = instrument.id();
2893        let spec = BarSpecification {
2894            step: std::num::NonZeroUsize::new(1).unwrap(),
2895            aggregation: BarAggregation::Minute,
2896            price_type: PriceType::Last,
2897        };
2898        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2899
2900        let bar = DydxDataClient::candle_to_bar(
2901            &candle,
2902            bar_type,
2903            instrument.price_precision(),
2904            instrument.size_precision(),
2905            60,
2906            clock,
2907        )
2908        .expect("candle_to_bar should handle large/scientific values");
2909
2910        assert!(bar.open.as_f64() > 0.0);
2911        assert!(bar.high.as_f64() >= bar.low.as_f64());
2912        assert!(bar.volume.as_f64() > 0.0);
2913    }
2914
2915    #[tokio::test]
2916    async fn test_candle_to_bar_ts_event_overflow_safe() {
2917        setup_test_env();
2918
2919        let clock = get_atomic_clock_realtime();
2920        let now = Utc::now();
2921
2922        let candle = Candle {
2923            started_at: now,
2924            ticker: "BTC-USD".to_string(),
2925            resolution: crate::common::enums::DydxCandleResolution::OneDay,
2926            open: Decimal::from(1),
2927            high: Decimal::from(1),
2928            low: Decimal::from(1),
2929            close: Decimal::from(1),
2930            base_token_volume: Decimal::from(1),
2931            usd_volume: Decimal::from(1),
2932            trades: 1,
2933            starting_open_interest: Decimal::from(1),
2934        };
2935
2936        let instrument = create_test_instrument_any();
2937        let instrument_id = instrument.id();
2938        let spec = BarSpecification {
2939            step: std::num::NonZeroUsize::new(1).unwrap(),
2940            aggregation: BarAggregation::Day,
2941            price_type: PriceType::Last,
2942        };
2943        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2944
2945        // Use an intentionally large bar_secs to exercise saturating_add path.
2946        let bar_secs = i64::MAX / 1_000_000_000;
2947        let bar = DydxDataClient::candle_to_bar(
2948            &candle,
2949            bar_type,
2950            instrument.price_precision(),
2951            instrument.size_precision(),
2952            bar_secs,
2953            clock,
2954        )
2955        .expect("candle_to_bar should not overflow on ts_event");
2956
2957        assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2958    }
2959
2960    #[tokio::test]
2961    async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2962        // Simulate bars with ts_event both before and after current_time_ns and
2963        // ensure only completed bars (ts_event < now) are retained.
2964        let clock = get_atomic_clock_realtime();
2965        let now = Utc::now();
2966
2967        // Use a dedicated data channel for this test and register it
2968        // before constructing the data client.
2969        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2970        set_data_event_sender(sender);
2971
2972        // two candles: one in the past, one in the future
2973        let candle_past = Candle {
2974            started_at: now - chrono::Duration::minutes(2),
2975            ticker: "BTC-USD".to_string(),
2976            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2977            open: Decimal::from(1),
2978            high: Decimal::from(2),
2979            low: Decimal::from(1),
2980            close: Decimal::from(1),
2981            base_token_volume: Decimal::from(1),
2982            usd_volume: Decimal::from(1),
2983            trades: 1,
2984            starting_open_interest: Decimal::from(1),
2985        };
2986        let candle_future = Candle {
2987            started_at: now + chrono::Duration::minutes(2),
2988            ..candle_past.clone()
2989        };
2990
2991        let candles_response = CandlesResponse {
2992            candles: vec![candle_past, candle_future],
2993        };
2994
2995        let state = CandlesTestState {
2996            response: Arc::new(candles_response),
2997        };
2998        let addr = start_candles_test_server(state).await;
2999        let base_url = format!("http://{addr}");
3000
3001        let client_id = ClientId::from("DYDX-BARS-SKEW");
3002        let config = DydxDataClientConfig {
3003            base_url_http: Some(base_url),
3004            is_testnet: true,
3005            ..Default::default()
3006        };
3007
3008        let http_client = DydxHttpClient::new(
3009            config.base_url_http.clone(),
3010            config.http_timeout_secs,
3011            config.http_proxy_url.clone(),
3012            config.is_testnet,
3013            None,
3014        )
3015        .unwrap();
3016
3017        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3018
3019        let instrument = create_test_instrument_any();
3020        let instrument_id = instrument.id();
3021        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3022        client.instruments.insert(symbol_key, instrument);
3023
3024        let spec = BarSpecification {
3025            step: std::num::NonZeroUsize::new(1).unwrap(),
3026            aggregation: BarAggregation::Minute,
3027            price_type: PriceType::Last,
3028        };
3029        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
3030
3031        let request = RequestBars::new(
3032            bar_type,
3033            Some(now - chrono::Duration::minutes(5)),
3034            Some(now + chrono::Duration::minutes(5)),
3035            None,
3036            Some(client_id),
3037            UUID4::new(),
3038            clock.get_time_ns(),
3039            None,
3040        );
3041
3042        assert!(client.request_bars(&request).is_ok());
3043
3044        let timeout = tokio::time::Duration::from_secs(3);
3045        if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
3046            tokio::time::timeout(timeout, rx.recv()).await
3047        {
3048            // Only the past candle should remain after filtering.
3049            assert_eq!(resp.data.len(), 1);
3050        }
3051    }
3052
3053    #[rstest]
3054    fn test_decimal_to_f64_precision_loss_within_tolerance() {
3055        // Verify converting via Price/Quantity preserves reasonable precision.
3056        let price_value = 12345.125_f64;
3057        let qty_value = 0.00012345_f64;
3058
3059        let price = Price::new(price_value, 6);
3060        let qty = Quantity::new(qty_value, 8);
3061
3062        let price_diff = (price.as_f64() - price_value).abs();
3063        let qty_diff = (qty.as_f64() - qty_value).abs();
3064
3065        // Differences should be well within a tiny epsilon.
3066        assert!(price_diff < 1e-10);
3067        assert!(qty_diff < 1e-12);
3068    }
3069
3070    #[tokio::test]
3071    async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
3072        // Set up a dedicated data event channel for this test.
3073        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3074        set_data_event_sender(sender);
3075
3076        // Prepare a static orderbook snapshot served by a local Axum HTTP server.
3077        let snapshot = crate::http::models::OrderbookResponse {
3078            bids: vec![crate::http::models::OrderbookLevel {
3079                price: dec!(100.0),
3080                size: dec!(1.0),
3081            }],
3082            asks: vec![crate::http::models::OrderbookLevel {
3083                price: dec!(101.0),
3084                size: dec!(2.0),
3085            }],
3086        };
3087        let state = OrderbookTestState {
3088            snapshot: Arc::new(snapshot),
3089        };
3090        let addr = start_orderbook_test_server(state).await;
3091        let base_url = format!("http://{addr}");
3092
3093        // Configure the data client with a short refresh interval and mock HTTP base URL.
3094        let client_id = ClientId::from("DYDX-REFRESH");
3095        let config = DydxDataClientConfig {
3096            is_testnet: true,
3097            base_url_http: Some(base_url),
3098            orderbook_refresh_interval_secs: Some(1),
3099            instrument_refresh_interval_secs: None,
3100            ..Default::default()
3101        };
3102
3103        let http_client = DydxHttpClient::new(
3104            config.base_url_http.clone(),
3105            config.http_timeout_secs,
3106            config.http_proxy_url.clone(),
3107            config.is_testnet,
3108            None,
3109        )
3110        .unwrap();
3111
3112        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3113
3114        // Seed instruments and orderbook state for a single instrument.
3115        let instrument = create_test_instrument_any();
3116        let instrument_id = instrument.id();
3117        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3118        client.instruments.insert(symbol_key, instrument);
3119        client.order_books.insert(
3120            instrument_id,
3121            OrderBook::new(instrument_id, BookType::L2_MBP),
3122        );
3123        client.active_orderbook_subs.insert(instrument_id, ());
3124
3125        // Start the refresh task and wait for a snapshot to be applied and emitted.
3126        client.start_orderbook_refresh_task().unwrap();
3127
3128        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3129        let mut saw_snapshot_event = false;
3130
3131        while std::time::Instant::now() < deadline {
3132            if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3133                tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3134            {
3135                saw_snapshot_event = true;
3136                break;
3137            }
3138        }
3139
3140        assert!(
3141            saw_snapshot_event,
3142            "expected at least one snapshot deltas event from refresh task"
3143        );
3144
3145        // Verify that the local orderbook has been updated with the snapshot.
3146        let book = client
3147            .order_books
3148            .get(&instrument_id)
3149            .expect("orderbook should exist after refresh");
3150        let best_bid = book.best_bid_price().expect("best bid should be set");
3151        let best_ask = book.best_ask_price().expect("best ask should be set");
3152
3153        assert_eq!(best_bid, Price::from("100.00"));
3154        assert_eq!(best_ask, Price::from("101.00"));
3155    }
3156
3157    #[rstest]
3158    fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3159        // Test scenario: bid_size > ask_size
3160        // Expected: DELETE ask, UPDATE bid (reduce by ask_size)
3161        let instrument = create_test_instrument_any();
3162        let instrument_id = instrument.id();
3163        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3164        let ts_init = get_atomic_clock_realtime().get_time_ns();
3165
3166        // Create initial non-crossed book
3167        let initial_deltas = vec![
3168            OrderBookDelta::new(
3169                instrument_id,
3170                BookAction::Add,
3171                BookOrder::new(
3172                    OrderSide::Buy,
3173                    Price::from("99.00"),
3174                    Quantity::from("1.0"),
3175                    0,
3176                ),
3177                0,
3178                0,
3179                ts_init,
3180                ts_init,
3181            ),
3182            OrderBookDelta::new(
3183                instrument_id,
3184                BookAction::Add,
3185                BookOrder::new(
3186                    OrderSide::Sell,
3187                    Price::from("101.00"),
3188                    Quantity::from("2.0"),
3189                    0,
3190                ),
3191                0,
3192                0,
3193                ts_init,
3194                ts_init,
3195            ),
3196        ];
3197        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3198            .unwrap();
3199
3200        // Create crossed orderbook: bid @ 102.00 (size 5.0) > ask @ 101.00 (size 2.0)
3201        let crossed_deltas = vec![OrderBookDelta::new(
3202            instrument_id,
3203            BookAction::Add,
3204            BookOrder::new(
3205                OrderSide::Buy,
3206                Price::from("102.00"),
3207                Quantity::from("5.0"),
3208                0,
3209            ),
3210            0,
3211            0,
3212            ts_init,
3213            ts_init,
3214        )];
3215        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3216
3217        let resolved =
3218            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3219                .unwrap();
3220
3221        // Verify resolution: ask @ 101.00 should be deleted
3222        // bid @ 102.00 should remain but reduced (note: precision affects exact value)
3223        assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3224        assert!(book.best_bid_size().unwrap().as_f64() < 5.0); // Reduced from original
3225        assert!(
3226            book.best_ask_price().is_none()
3227                || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3228        ); // No longer crossed
3229
3230        // Verify synthetic deltas were generated
3231        assert!(resolved.deltas.len() > 1); // Original delta + synthetic resolution deltas
3232    }
3233
3234    #[rstest]
3235    fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3236        // Test scenario: bid_size < ask_size
3237        // Expected: DELETE bid, UPDATE ask (reduce by bid_size)
3238        let instrument = create_test_instrument_any();
3239        let instrument_id = instrument.id();
3240        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3241        let ts_init = get_atomic_clock_realtime().get_time_ns();
3242
3243        // Create initial non-crossed book
3244        let initial_deltas = vec![
3245            OrderBookDelta::new(
3246                instrument_id,
3247                BookAction::Add,
3248                BookOrder::new(
3249                    OrderSide::Buy,
3250                    Price::from("99.00"),
3251                    Quantity::from("1.0"),
3252                    0,
3253                ),
3254                0,
3255                0,
3256                ts_init,
3257                ts_init,
3258            ),
3259            OrderBookDelta::new(
3260                instrument_id,
3261                BookAction::Add,
3262                BookOrder::new(
3263                    OrderSide::Sell,
3264                    Price::from("101.00"),
3265                    Quantity::from("5.0"),
3266                    0,
3267                ),
3268                0,
3269                0,
3270                ts_init,
3271                ts_init,
3272            ),
3273        ];
3274        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3275            .unwrap();
3276
3277        // Create crossed orderbook: bid @ 102.00 (size 2.0) < ask @ 101.00 (size 5.0)
3278        let crossed_deltas = vec![OrderBookDelta::new(
3279            instrument_id,
3280            BookAction::Add,
3281            BookOrder::new(
3282                OrderSide::Buy,
3283                Price::from("102.00"),
3284                Quantity::from("2.0"),
3285                0,
3286            ),
3287            0,
3288            0,
3289            ts_init,
3290            ts_init,
3291        )];
3292        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3293
3294        let resolved =
3295            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3296                .unwrap();
3297
3298        // Verify resolution: bid @ 102.00 should be deleted, ask @ 101.00 reduced
3299        assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3300        assert!(book.best_ask_size().unwrap().as_f64() < 5.0); // Reduced from original
3301        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level remains
3302        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); // No longer crossed
3303
3304        // Verify synthetic deltas were generated
3305        assert!(resolved.deltas.len() > 1);
3306    }
3307
3308    #[rstest]
3309    fn test_resolve_crossed_order_book_equal_sizes() {
3310        // Test scenario: bid_size == ask_size
3311        // Expected: DELETE both bid and ask
3312        let instrument = create_test_instrument_any();
3313        let instrument_id = instrument.id();
3314        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3315        let ts_init = get_atomic_clock_realtime().get_time_ns();
3316
3317        // Create initial non-crossed book with multiple levels
3318        let initial_deltas = vec![
3319            OrderBookDelta::new(
3320                instrument_id,
3321                BookAction::Add,
3322                BookOrder::new(
3323                    OrderSide::Buy,
3324                    Price::from("99.00"),
3325                    Quantity::from("1.0"),
3326                    0,
3327                ),
3328                0,
3329                0,
3330                ts_init,
3331                ts_init,
3332            ),
3333            OrderBookDelta::new(
3334                instrument_id,
3335                BookAction::Add,
3336                BookOrder::new(
3337                    OrderSide::Sell,
3338                    Price::from("101.00"),
3339                    Quantity::from("3.0"),
3340                    0,
3341                ),
3342                0,
3343                0,
3344                ts_init,
3345                ts_init,
3346            ),
3347        ];
3348        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3349            .unwrap();
3350
3351        // Create crossed orderbook: bid @ 102.00 (size 3.0) == ask @ 101.00 (size 3.0)
3352        let crossed_deltas = vec![OrderBookDelta::new(
3353            instrument_id,
3354            BookAction::Add,
3355            BookOrder::new(
3356                OrderSide::Buy,
3357                Price::from("102.00"),
3358                Quantity::from("3.0"),
3359                0,
3360            ),
3361            0,
3362            0,
3363            ts_init,
3364            ts_init,
3365        )];
3366        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3367
3368        let resolved =
3369            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3370                .unwrap();
3371
3372        // Verify resolution: both crossed levels should be deleted, reverting to deeper levels
3373        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level
3374        // Ask at 101.00 should be deleted, book may be empty on ask side or have deeper levels
3375        if let Some(ask_price) = book.best_ask_price() {
3376            assert!(ask_price > book.best_bid_price().unwrap()); // No longer crossed
3377        }
3378
3379        // Verify synthetic deltas were generated
3380        assert!(resolved.deltas.len() > 1);
3381    }
3382
3383    #[rstest]
3384    fn test_resolve_crossed_order_book_multiple_iterations() {
3385        // Test scenario: multiple crossed levels requiring multiple iterations
3386        let instrument = create_test_instrument_any();
3387        let instrument_id = instrument.id();
3388        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3389        let ts_init = get_atomic_clock_realtime().get_time_ns();
3390
3391        // Create initial book with multiple levels on both sides
3392        let initial_deltas = vec![
3393            OrderBookDelta::new(
3394                instrument_id,
3395                BookAction::Add,
3396                BookOrder::new(
3397                    OrderSide::Buy,
3398                    Price::from("98.00"),
3399                    Quantity::from("1.0"),
3400                    0,
3401                ),
3402                0,
3403                0,
3404                ts_init,
3405                ts_init,
3406            ),
3407            OrderBookDelta::new(
3408                instrument_id,
3409                BookAction::Add,
3410                BookOrder::new(
3411                    OrderSide::Sell,
3412                    Price::from("100.00"),
3413                    Quantity::from("1.0"),
3414                    0,
3415                ),
3416                0,
3417                0,
3418                ts_init,
3419                ts_init,
3420            ),
3421            OrderBookDelta::new(
3422                instrument_id,
3423                BookAction::Add,
3424                BookOrder::new(
3425                    OrderSide::Sell,
3426                    Price::from("101.00"),
3427                    Quantity::from("1.0"),
3428                    0,
3429                ),
3430                0,
3431                0,
3432                ts_init,
3433                ts_init,
3434            ),
3435        ];
3436        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3437            .unwrap();
3438
3439        // Create heavily crossed orderbook with multiple bids above asks
3440        let crossed_deltas = vec![
3441            OrderBookDelta::new(
3442                instrument_id,
3443                BookAction::Add,
3444                BookOrder::new(
3445                    OrderSide::Buy,
3446                    Price::from("102.00"),
3447                    Quantity::from("1.0"),
3448                    0,
3449                ),
3450                0,
3451                0,
3452                ts_init,
3453                ts_init,
3454            ),
3455            OrderBookDelta::new(
3456                instrument_id,
3457                BookAction::Add,
3458                BookOrder::new(
3459                    OrderSide::Buy,
3460                    Price::from("103.00"),
3461                    Quantity::from("1.0"),
3462                    0,
3463                ),
3464                0,
3465                0,
3466                ts_init,
3467                ts_init,
3468            ),
3469        ];
3470        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3471
3472        let resolved =
3473            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3474                .unwrap();
3475
3476        // Verify final state is uncrossed (or book has no asks left)
3477        if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3478            assert!(ask_price > bid_price, "Book should be uncrossed");
3479        }
3480
3481        // Verify multiple synthetic deltas were generated for multiple iterations
3482        assert!(resolved.deltas.len() > 2); // Original deltas + multiple resolution passes
3483    }
3484
3485    #[rstest]
3486    fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3487        // Test scenario: non-crossed orderbook should pass through unchanged
3488        let instrument = create_test_instrument_any();
3489        let instrument_id = instrument.id();
3490        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3491        let ts_init = get_atomic_clock_realtime().get_time_ns();
3492
3493        // Create normal non-crossed book
3494        let initial_deltas = vec![
3495            OrderBookDelta::new(
3496                instrument_id,
3497                BookAction::Add,
3498                BookOrder::new(
3499                    OrderSide::Buy,
3500                    Price::from("99.00"),
3501                    Quantity::from("1.0"),
3502                    0,
3503                ),
3504                0,
3505                0,
3506                ts_init,
3507                ts_init,
3508            ),
3509            OrderBookDelta::new(
3510                instrument_id,
3511                BookAction::Add,
3512                BookOrder::new(
3513                    OrderSide::Sell,
3514                    Price::from("101.00"),
3515                    Quantity::from("1.0"),
3516                    0,
3517                ),
3518                0,
3519                0,
3520                ts_init,
3521                ts_init,
3522            ),
3523        ];
3524        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3525            .unwrap();
3526
3527        // Add another non-crossing level
3528        let new_deltas = vec![OrderBookDelta::new(
3529            instrument_id,
3530            BookAction::Add,
3531            BookOrder::new(
3532                OrderSide::Buy,
3533                Price::from("98.50"),
3534                Quantity::from("2.0"),
3535                0,
3536            ),
3537            0,
3538            0,
3539            ts_init,
3540            ts_init,
3541        )];
3542        let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3543
3544        let original_bid = book.best_bid_price();
3545        let original_ask = book.best_ask_price();
3546
3547        let resolved =
3548            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3549                .unwrap();
3550
3551        // Verify no resolution was needed - deltas should be original only
3552        assert_eq!(resolved.deltas.len(), new_deltas.len());
3553        assert_eq!(book.best_bid_price(), original_bid);
3554        assert_eq!(book.best_ask_price(), original_ask);
3555        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3556    }
3557
3558    #[tokio::test]
3559    async fn test_request_instruments_successful_fetch() {
3560        // Test successful fetch of all instruments
3561        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3562        set_data_event_sender(sender);
3563
3564        let client_id = ClientId::from("DYDX-TEST");
3565        let config = DydxDataClientConfig::default();
3566        let http_client = DydxHttpClient::default();
3567        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3568
3569        let request = RequestInstruments::new(
3570            None,
3571            None,
3572            Some(client_id),
3573            Some(*DYDX_VENUE),
3574            UUID4::new(),
3575            get_atomic_clock_realtime().get_time_ns(),
3576            None,
3577        );
3578
3579        // Execute request (spawns async task)
3580        assert!(client.request_instruments(&request).is_ok());
3581
3582        // Wait for response (with timeout)
3583        let timeout = tokio::time::Duration::from_secs(5);
3584        let result = tokio::time::timeout(timeout, rx.recv()).await;
3585
3586        match result {
3587            Ok(Some(DataEvent::Response(resp))) => {
3588                if let DataResponse::Instruments(inst_resp) = resp {
3589                    // Verify response structure
3590                    assert_eq!(inst_resp.correlation_id, request.request_id);
3591                    assert_eq!(inst_resp.client_id, client_id);
3592                    assert_eq!(inst_resp.venue, *DYDX_VENUE);
3593                    assert!(inst_resp.start.is_none());
3594                    assert!(inst_resp.end.is_none());
3595                    // Note: may be empty if HTTP fails, but structure should be correct
3596                }
3597            }
3598            Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3599            Ok(None) => panic!("Channel closed unexpectedly"),
3600            Err(_) => {
3601                // Timeout is acceptable if testnet is unreachable
3602                println!("Test timed out - testnet may be unreachable");
3603            }
3604        }
3605    }
3606
3607    #[tokio::test]
3608    async fn test_request_instruments_empty_response_on_http_error() {
3609        // Test empty response handling when HTTP call fails
3610        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3611        set_data_event_sender(sender);
3612
3613        let client_id = ClientId::from("DYDX-ERROR-TEST");
3614        let config = DydxDataClientConfig {
3615            base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3616            ..Default::default()
3617        };
3618        let http_client = DydxHttpClient::new(
3619            config.base_url_http.clone(),
3620            config.http_timeout_secs,
3621            config.http_proxy_url.clone(),
3622            config.is_testnet,
3623            None,
3624        )
3625        .unwrap();
3626
3627        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3628
3629        let request = RequestInstruments::new(
3630            None,
3631            None,
3632            Some(client_id),
3633            Some(*DYDX_VENUE),
3634            UUID4::new(),
3635            get_atomic_clock_realtime().get_time_ns(),
3636            None,
3637        );
3638
3639        assert!(client.request_instruments(&request).is_ok());
3640
3641        // Should receive empty response on error
3642        let timeout = tokio::time::Duration::from_secs(3);
3643        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3644            tokio::time::timeout(timeout, rx.recv()).await
3645        {
3646            assert!(
3647                resp.data.is_empty(),
3648                "Expected empty instruments on HTTP error"
3649            );
3650            assert_eq!(resp.correlation_id, request.request_id);
3651            assert_eq!(resp.client_id, client_id);
3652        }
3653    }
3654
3655    #[tokio::test]
3656    async fn test_request_instruments_caching() {
3657        // Test instrument caching after fetch
3658        setup_test_env();
3659
3660        let client_id = ClientId::from("DYDX-CACHE-TEST");
3661        let config = DydxDataClientConfig::default();
3662        let http_client = DydxHttpClient::default();
3663        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3664
3665        let initial_cache_size = client.instruments.len();
3666
3667        let request = RequestInstruments::new(
3668            None,
3669            None,
3670            Some(client_id),
3671            Some(*DYDX_VENUE),
3672            UUID4::new(),
3673            get_atomic_clock_realtime().get_time_ns(),
3674            None,
3675        );
3676
3677        assert!(client.request_instruments(&request).is_ok());
3678
3679        // Wait for async task to complete
3680        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3681
3682        // Verify cache populated (if HTTP succeeded)
3683        let final_cache_size = client.instruments.len();
3684        // Cache should be unchanged (empty) if HTTP failed, or populated if succeeded
3685        // We can't assert exact size without mocking, but can verify no panic
3686        assert!(final_cache_size >= initial_cache_size);
3687    }
3688
3689    #[tokio::test]
3690    async fn test_request_instruments_correlation_id_matching() {
3691        // Test correlation_id matching in response
3692        setup_test_env();
3693
3694        let client_id = ClientId::from("DYDX-CORR-TEST");
3695        let config = DydxDataClientConfig::default();
3696        let http_client = DydxHttpClient::default();
3697        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3698
3699        let request_id = UUID4::new();
3700        let request = RequestInstruments::new(
3701            None,
3702            None,
3703            Some(client_id),
3704            Some(*DYDX_VENUE),
3705            request_id,
3706            get_atomic_clock_realtime().get_time_ns(),
3707            None,
3708        );
3709
3710        // Should execute without panic (actual correlation checked in async handler)
3711        assert!(client.request_instruments(&request).is_ok());
3712    }
3713
3714    #[tokio::test]
3715    async fn test_request_instruments_venue_assignment() {
3716        // Test venue assignment
3717        setup_test_env();
3718
3719        let client_id = ClientId::from("DYDX-VENUE-TEST");
3720        let config = DydxDataClientConfig::default();
3721        let http_client = DydxHttpClient::default();
3722        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3723
3724        assert_eq!(client.venue(), *DYDX_VENUE);
3725
3726        let request = RequestInstruments::new(
3727            None,
3728            None,
3729            Some(client_id),
3730            Some(*DYDX_VENUE),
3731            UUID4::new(),
3732            get_atomic_clock_realtime().get_time_ns(),
3733            None,
3734        );
3735
3736        assert!(client.request_instruments(&request).is_ok());
3737    }
3738
3739    #[tokio::test]
3740    async fn test_request_instruments_timestamp_handling() {
3741        // Test timestamp handling (start_nanos, end_nanos)
3742        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3743        set_data_event_sender(sender);
3744
3745        let client_id = ClientId::from("DYDX-TS-TEST");
3746        let config = DydxDataClientConfig::default();
3747        let http_client = DydxHttpClient::default();
3748        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3749
3750        let now = Utc::now();
3751        let start = Some(now - chrono::Duration::hours(24));
3752        let end = Some(now);
3753
3754        let request = RequestInstruments::new(
3755            start,
3756            end,
3757            Some(client_id),
3758            Some(*DYDX_VENUE),
3759            UUID4::new(),
3760            get_atomic_clock_realtime().get_time_ns(),
3761            None,
3762        );
3763
3764        assert!(client.request_instruments(&request).is_ok());
3765
3766        // Wait for response
3767        let timeout = tokio::time::Duration::from_secs(3);
3768        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3769            tokio::time::timeout(timeout, rx.recv()).await
3770        {
3771            // Verify timestamps are set
3772            assert!(resp.start.unwrap() > 0);
3773            assert!(resp.end.unwrap() > 0);
3774            assert!(resp.start.unwrap() <= resp.end.unwrap());
3775            assert!(resp.ts_init > 0);
3776        }
3777    }
3778
3779    #[tokio::test]
3780    async fn test_request_instruments_with_start_only() {
3781        // Test timestamp handling when only `start` is provided
3782        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3783        set_data_event_sender(sender);
3784
3785        let client_id = ClientId::from("DYDX-TS-START-ONLY");
3786        let config = DydxDataClientConfig::default();
3787        let http_client = DydxHttpClient::default();
3788        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3789
3790        let now = Utc::now();
3791        let start = Some(now - chrono::Duration::hours(24));
3792
3793        let request = RequestInstruments::new(
3794            start,
3795            None,
3796            Some(client_id),
3797            Some(*DYDX_VENUE),
3798            UUID4::new(),
3799            get_atomic_clock_realtime().get_time_ns(),
3800            None,
3801        );
3802
3803        assert!(client.request_instruments(&request).is_ok());
3804
3805        let timeout = tokio::time::Duration::from_secs(3);
3806        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3807            tokio::time::timeout(timeout, rx.recv()).await
3808        {
3809            assert!(resp.start.is_some());
3810            assert!(resp.end.is_none());
3811            assert!(resp.ts_init > 0);
3812        }
3813    }
3814
3815    #[tokio::test]
3816    async fn test_request_instruments_with_end_only() {
3817        // Test timestamp handling when only `end` is provided
3818        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3819        set_data_event_sender(sender);
3820
3821        let client_id = ClientId::from("DYDX-TS-END-ONLY");
3822        let config = DydxDataClientConfig::default();
3823        let http_client = DydxHttpClient::default();
3824        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3825
3826        let now = Utc::now();
3827        let end = Some(now);
3828
3829        let request = RequestInstruments::new(
3830            None,
3831            end,
3832            Some(client_id),
3833            Some(*DYDX_VENUE),
3834            UUID4::new(),
3835            get_atomic_clock_realtime().get_time_ns(),
3836            None,
3837        );
3838
3839        assert!(client.request_instruments(&request).is_ok());
3840
3841        let timeout = tokio::time::Duration::from_secs(3);
3842        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3843            tokio::time::timeout(timeout, rx.recv()).await
3844        {
3845            assert!(resp.start.is_none());
3846            assert!(resp.end.is_some());
3847            assert!(resp.ts_init > 0);
3848        }
3849    }
3850
3851    #[tokio::test]
3852    async fn test_request_instruments_client_id_fallback() {
3853        // Test client_id fallback to default when not provided
3854        setup_test_env();
3855
3856        let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3857        let config = DydxDataClientConfig::default();
3858        let http_client = DydxHttpClient::default();
3859        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3860
3861        let request = RequestInstruments::new(
3862            None,
3863            None,
3864            None, // No client_id provided
3865            Some(*DYDX_VENUE),
3866            UUID4::new(),
3867            get_atomic_clock_realtime().get_time_ns(),
3868            None,
3869        );
3870
3871        // Should use client's default client_id
3872        assert!(client.request_instruments(&request).is_ok());
3873    }
3874
3875    #[tokio::test]
3876    async fn test_request_instruments_with_params() {
3877        // Test custom params handling
3878        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3879        set_data_event_sender(sender);
3880
3881        let client_id = ClientId::from("DYDX-PARAMS-TEST");
3882        let config = DydxDataClientConfig::default();
3883        let http_client = DydxHttpClient::default();
3884        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3885
3886        // Create params - just verify they're passed through
3887        let mut params_map = IndexMap::new();
3888        params_map.insert("test_key".to_string(), "test_value".to_string());
3889
3890        let request = RequestInstruments::new(
3891            None,
3892            None,
3893            Some(client_id),
3894            Some(*DYDX_VENUE),
3895            UUID4::new(),
3896            get_atomic_clock_realtime().get_time_ns(),
3897            Some(params_map),
3898        );
3899
3900        assert!(client.request_instruments(&request).is_ok());
3901
3902        // Wait for response
3903        let timeout = tokio::time::Duration::from_secs(3);
3904        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3905            tokio::time::timeout(timeout, rx.recv()).await
3906        {
3907            // Verify params are propagated into the response
3908            assert_eq!(resp.client_id, client_id);
3909            let params = resp
3910                .params
3911                .expect("expected params to be present in InstrumentsResponse");
3912            assert_eq!(
3913                params.get("test_key").map(String::as_str),
3914                Some("test_value")
3915            );
3916        }
3917    }
3918
3919    #[tokio::test]
3920    async fn test_request_instruments_with_start_and_end_range() {
3921        // Test timestamp handling when both start and end are provided
3922        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3923        set_data_event_sender(sender);
3924
3925        let client_id = ClientId::from("DYDX-START-END-RANGE");
3926        let config = DydxDataClientConfig::default();
3927        let http_client = DydxHttpClient::default();
3928        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3929
3930        let now = Utc::now();
3931        let start = Some(now - chrono::Duration::hours(48));
3932        let end = Some(now - chrono::Duration::hours(24));
3933
3934        let request = RequestInstruments::new(
3935            start,
3936            end,
3937            Some(client_id),
3938            Some(*DYDX_VENUE),
3939            UUID4::new(),
3940            get_atomic_clock_realtime().get_time_ns(),
3941            None,
3942        );
3943
3944        assert!(client.request_instruments(&request).is_ok());
3945
3946        let timeout = tokio::time::Duration::from_secs(3);
3947        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3948            tokio::time::timeout(timeout, rx.recv()).await
3949        {
3950            // Verify both timestamps are present
3951            assert!(
3952                resp.start.is_some(),
3953                "start timestamp should be present when provided"
3954            );
3955            assert!(
3956                resp.end.is_some(),
3957                "end timestamp should be present when provided"
3958            );
3959            assert!(resp.ts_init > 0, "ts_init should always be set");
3960
3961            // Verify start is before end
3962            if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3963                assert!(
3964                    start_ts < end_ts,
3965                    "start timestamp should be before end timestamp"
3966                );
3967            }
3968        }
3969    }
3970
3971    #[tokio::test]
3972    async fn test_request_instruments_different_client_ids() {
3973        // Test that different client_id values are properly handled using a shared channel.
3974        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3975        set_data_event_sender(sender);
3976
3977        let timeout = tokio::time::Duration::from_secs(3);
3978
3979        // First client
3980        let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3981        let config1 = DydxDataClientConfig::default();
3982        let http_client1 = DydxHttpClient::default();
3983        let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3984
3985        let request1 = RequestInstruments::new(
3986            None,
3987            None,
3988            Some(client_id_1),
3989            Some(*DYDX_VENUE),
3990            UUID4::new(),
3991            get_atomic_clock_realtime().get_time_ns(),
3992            None,
3993        );
3994
3995        assert!(client1.request_instruments(&request1).is_ok());
3996
3997        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3998            tokio::time::timeout(timeout, rx.recv()).await
3999        {
4000            assert_eq!(
4001                resp1.client_id, client_id_1,
4002                "Response should contain client_id_1"
4003            );
4004        }
4005
4006        // Second client
4007        let client_id_2 = ClientId::from("DYDX-CLIENT-2");
4008        let config2 = DydxDataClientConfig::default();
4009        let http_client2 = DydxHttpClient::default();
4010        let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
4011
4012        let request2 = RequestInstruments::new(
4013            None,
4014            None,
4015            Some(client_id_2),
4016            Some(*DYDX_VENUE),
4017            UUID4::new(),
4018            get_atomic_clock_realtime().get_time_ns(),
4019            None,
4020        );
4021
4022        assert!(client2.request_instruments(&request2).is_ok());
4023
4024        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
4025            tokio::time::timeout(timeout, rx.recv()).await
4026        {
4027            assert_eq!(
4028                resp2.client_id, client_id_2,
4029                "Response should contain client_id_2"
4030            );
4031            assert_ne!(
4032                resp2.client_id, client_id_1,
4033                "Different clients should have different client_ids"
4034            );
4035        }
4036    }
4037
4038    #[tokio::test]
4039    async fn test_request_instruments_no_timestamps() {
4040        // Test fetching all current instruments (no start/end filters)
4041        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4042        set_data_event_sender(sender);
4043
4044        let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
4045        let config = DydxDataClientConfig::default();
4046        let http_client = DydxHttpClient::default();
4047        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4048
4049        let request = RequestInstruments::new(
4050            None, // No start filter
4051            None, // No end filter
4052            Some(client_id),
4053            Some(*DYDX_VENUE),
4054            UUID4::new(),
4055            get_atomic_clock_realtime().get_time_ns(),
4056            None,
4057        );
4058
4059        assert!(client.request_instruments(&request).is_ok());
4060
4061        let timeout = tokio::time::Duration::from_secs(5);
4062        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4063            tokio::time::timeout(timeout, rx.recv()).await
4064        {
4065            // Verify no timestamp filters
4066            assert!(
4067                resp.start.is_none(),
4068                "start should be None when not provided"
4069            );
4070            assert!(resp.end.is_none(), "end should be None when not provided");
4071
4072            // Should still get current instruments
4073            assert_eq!(resp.venue, *DYDX_VENUE);
4074            assert_eq!(resp.client_id, client_id);
4075            assert!(resp.ts_init > 0);
4076        }
4077    }
4078
4079    #[tokio::test]
4080    async fn test_request_instrument_cache_hit() {
4081        // Test cache hit (instrument already cached)
4082        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4083        set_data_event_sender(sender);
4084
4085        let client_id = ClientId::from("DYDX-CACHE-HIT");
4086        let config = DydxDataClientConfig::default();
4087        let http_client = DydxHttpClient::default();
4088        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4089
4090        // Pre-populate cache with test instrument
4091        let instrument = create_test_instrument_any();
4092        let instrument_id = instrument.id();
4093        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4094        client.instruments.insert(symbol_key, instrument.clone());
4095
4096        let request = RequestInstrument::new(
4097            instrument_id,
4098            None,
4099            None,
4100            Some(client_id),
4101            UUID4::new(),
4102            get_atomic_clock_realtime().get_time_ns(),
4103            None,
4104        );
4105
4106        assert!(client.request_instrument(&request).is_ok());
4107
4108        // Should get immediate response from cache
4109        let timeout = tokio::time::Duration::from_millis(500);
4110        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4111            tokio::time::timeout(timeout, rx.recv()).await
4112        {
4113            assert_eq!(resp.instrument_id, instrument_id);
4114            assert_eq!(resp.client_id, client_id);
4115            assert_eq!(resp.data.id(), instrument_id);
4116        }
4117    }
4118
4119    #[tokio::test]
4120    async fn test_request_instrument_cache_miss() {
4121        // Test cache miss (fetch from API)
4122        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4123        set_data_event_sender(sender);
4124
4125        let client_id = ClientId::from("DYDX-CACHE-MISS");
4126        let config = DydxDataClientConfig::default();
4127        let http_client = DydxHttpClient::default();
4128        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4129
4130        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4131
4132        let request = RequestInstrument::new(
4133            instrument_id,
4134            None,
4135            None,
4136            Some(client_id),
4137            UUID4::new(),
4138            get_atomic_clock_realtime().get_time_ns(),
4139            None,
4140        );
4141
4142        assert!(client.request_instrument(&request).is_ok());
4143
4144        // Wait for async HTTP fetch and response
4145        let timeout = tokio::time::Duration::from_secs(5);
4146        let result = tokio::time::timeout(timeout, rx.recv()).await;
4147
4148        // May timeout if testnet unreachable, but should not panic
4149        match result {
4150            Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4151                assert_eq!(resp.instrument_id, instrument_id);
4152                assert_eq!(resp.client_id, client_id);
4153            }
4154            Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4155            Ok(None) => panic!("Channel closed unexpectedly"),
4156            Err(_) => {
4157                println!("Test timed out - testnet may be unreachable");
4158            }
4159        }
4160    }
4161
4162    #[tokio::test]
4163    async fn test_request_instrument_not_found() {
4164        // Test instrument not found scenario
4165        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4166        set_data_event_sender(sender);
4167
4168        let client_id = ClientId::from("DYDX-NOT-FOUND");
4169        let config = DydxDataClientConfig {
4170            base_url_http: Some("http://invalid-url.local".to_string()),
4171            ..Default::default()
4172        };
4173        let http_client = DydxHttpClient::new(
4174            config.base_url_http.clone(),
4175            config.http_timeout_secs,
4176            config.http_proxy_url.clone(),
4177            config.is_testnet,
4178            None,
4179        )
4180        .unwrap();
4181
4182        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4183
4184        let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4185
4186        let request = RequestInstrument::new(
4187            instrument_id,
4188            None,
4189            None,
4190            Some(client_id),
4191            UUID4::new(),
4192            get_atomic_clock_realtime().get_time_ns(),
4193            None,
4194        );
4195
4196        // Should not panic on invalid instrument
4197        assert!(client.request_instrument(&request).is_ok());
4198
4199        // Note: No response sent when instrument not found (by design)
4200        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4201    }
4202
4203    #[tokio::test]
4204    async fn test_request_instrument_bulk_caching() {
4205        // Test bulk caching when fetching from API
4206        setup_test_env();
4207
4208        let client_id = ClientId::from("DYDX-BULK-CACHE");
4209        let config = DydxDataClientConfig::default();
4210        let http_client = DydxHttpClient::default();
4211        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4212
4213        let initial_cache_size = client.instruments.len();
4214
4215        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4216
4217        let request = RequestInstrument::new(
4218            instrument_id,
4219            None,
4220            None,
4221            Some(client_id),
4222            UUID4::new(),
4223            get_atomic_clock_realtime().get_time_ns(),
4224            None,
4225        );
4226
4227        assert!(client.request_instrument(&request).is_ok());
4228
4229        // Wait for async bulk fetch
4230        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4231
4232        // Verify cache populated with all instruments (if HTTP succeeded)
4233        let final_cache_size = client.instruments.len();
4234        assert!(final_cache_size >= initial_cache_size);
4235        // If HTTP succeeded, cache should have multiple instruments
4236    }
4237
4238    #[tokio::test]
4239    async fn test_request_instrument_correlation_id() {
4240        // Test correlation_id matching
4241        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4242        set_data_event_sender(sender);
4243
4244        let client_id = ClientId::from("DYDX-CORR-ID");
4245        let config = DydxDataClientConfig::default();
4246        let http_client = DydxHttpClient::default();
4247        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4248
4249        // Pre-populate cache to get immediate response
4250        let instrument = create_test_instrument_any();
4251        let instrument_id = instrument.id();
4252        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4253        client.instruments.insert(symbol_key, instrument.clone());
4254
4255        let request_id = UUID4::new();
4256        let request = RequestInstrument::new(
4257            instrument_id,
4258            None,
4259            None,
4260            Some(client_id),
4261            request_id,
4262            get_atomic_clock_realtime().get_time_ns(),
4263            None,
4264        );
4265
4266        assert!(client.request_instrument(&request).is_ok());
4267
4268        // Verify correlation_id matches
4269        let timeout = tokio::time::Duration::from_millis(500);
4270        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4271            tokio::time::timeout(timeout, rx.recv()).await
4272        {
4273            assert_eq!(resp.correlation_id, request_id);
4274        }
4275    }
4276
4277    #[tokio::test]
4278    async fn test_request_instrument_response_format_boxed() {
4279        // Verify InstrumentResponse format (boxed)
4280        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4281        set_data_event_sender(sender);
4282
4283        let client_id = ClientId::from("DYDX-BOXED");
4284        let config = DydxDataClientConfig::default();
4285        let http_client = DydxHttpClient::default();
4286        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4287
4288        // Pre-populate cache
4289        let instrument = create_test_instrument_any();
4290        let instrument_id = instrument.id();
4291        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4292        client.instruments.insert(symbol_key, instrument.clone());
4293
4294        let request = RequestInstrument::new(
4295            instrument_id,
4296            None,
4297            None,
4298            Some(client_id),
4299            UUID4::new(),
4300            get_atomic_clock_realtime().get_time_ns(),
4301            None,
4302        );
4303
4304        assert!(client.request_instrument(&request).is_ok());
4305
4306        // Verify response is properly boxed
4307        let timeout = tokio::time::Duration::from_millis(500);
4308        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4309            tokio::time::timeout(timeout, rx.recv()).await
4310        {
4311            // Verify boxed response structure
4312            assert_eq!(boxed_resp.instrument_id, instrument_id);
4313            assert_eq!(boxed_resp.client_id, client_id);
4314            assert!(boxed_resp.start.is_none());
4315            assert!(boxed_resp.end.is_none());
4316            assert!(boxed_resp.ts_init > 0);
4317        }
4318    }
4319
4320    #[rstest]
4321    fn test_request_instrument_symbol_extraction() {
4322        // Test symbol extraction from InstrumentId
4323        setup_test_env();
4324
4325        let client_id = ClientId::from("DYDX-SYMBOL");
4326        let config = DydxDataClientConfig::default();
4327        let http_client = DydxHttpClient::default();
4328        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4329
4330        // Test various instrument ID formats
4331        // Note: Symbol includes the -PERP suffix in dYdX
4332        let test_cases = vec![
4333            ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4334            ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4335            ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4336        ];
4337
4338        for (instrument_id_str, expected_symbol) in test_cases {
4339            let instrument_id = InstrumentId::from(instrument_id_str);
4340            let symbol = Ustr::from(instrument_id.symbol.as_str());
4341            assert_eq!(symbol.as_str(), expected_symbol);
4342        }
4343    }
4344
4345    #[tokio::test]
4346    async fn test_request_instrument_client_id_fallback() {
4347        // Test client_id fallback to default
4348        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4349        set_data_event_sender(sender);
4350
4351        let client_id = ClientId::from("DYDX-FALLBACK");
4352        let config = DydxDataClientConfig::default();
4353        let http_client = DydxHttpClient::default();
4354        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4355
4356        // Pre-populate cache
4357        let instrument = create_test_instrument_any();
4358        let instrument_id = instrument.id();
4359        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4360        client.instruments.insert(symbol_key, instrument.clone());
4361
4362        let request = RequestInstrument::new(
4363            instrument_id,
4364            None,
4365            None,
4366            None, // No client_id provided
4367            UUID4::new(),
4368            get_atomic_clock_realtime().get_time_ns(),
4369            None,
4370        );
4371
4372        assert!(client.request_instrument(&request).is_ok());
4373
4374        // Should use client's default client_id
4375        let timeout = tokio::time::Duration::from_millis(500);
4376        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4377            tokio::time::timeout(timeout, rx.recv()).await
4378        {
4379            assert_eq!(resp.client_id, client_id);
4380        }
4381    }
4382
4383    #[tokio::test]
4384    async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4385        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4386        set_data_event_sender(sender);
4387
4388        let created_at = Utc::now();
4389
4390        let http_trade = crate::http::models::Trade {
4391            id: "trade-1".to_string(),
4392            side: OrderSide::Buy,
4393            size: dec!(1.5),
4394            price: dec!(100.25),
4395            created_at,
4396            created_at_height: 1,
4397            trade_type: crate::common::enums::DydxTradeType::Limit,
4398        };
4399
4400        let trades_response = crate::http::models::TradesResponse {
4401            trades: vec![http_trade],
4402        };
4403
4404        let state = TradesTestState {
4405            response: Arc::new(trades_response),
4406            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4407            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4408        };
4409
4410        let addr = start_trades_test_server(state.clone()).await;
4411        let base_url = format!("http://{addr}");
4412
4413        let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4414        let config = DydxDataClientConfig {
4415            base_url_http: Some(base_url),
4416            is_testnet: true,
4417            ..Default::default()
4418        };
4419
4420        let http_client = DydxHttpClient::new(
4421            config.base_url_http.clone(),
4422            config.http_timeout_secs,
4423            config.http_proxy_url.clone(),
4424            config.is_testnet,
4425            None,
4426        )
4427        .unwrap();
4428
4429        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4430
4431        let instrument = create_test_instrument_any();
4432        let instrument_id = instrument.id();
4433        let price_precision = instrument.price_precision();
4434        let size_precision = instrument.size_precision();
4435        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4436        client.instruments.insert(symbol_key, instrument);
4437
4438        let request_id = UUID4::new();
4439        let now = Utc::now();
4440        let start = Some(now - chrono::Duration::seconds(10));
4441        let end = Some(now + chrono::Duration::seconds(10));
4442        let limit = std::num::NonZeroUsize::new(100).unwrap();
4443
4444        let request = RequestTrades::new(
4445            instrument_id,
4446            start,
4447            end,
4448            Some(limit),
4449            Some(client_id),
4450            request_id,
4451            get_atomic_clock_realtime().get_time_ns(),
4452            None,
4453        );
4454
4455        assert!(client.request_trades(&request).is_ok());
4456
4457        let timeout = tokio::time::Duration::from_secs(1);
4458        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4459            tokio::time::timeout(timeout, rx.recv()).await
4460        {
4461            assert_eq!(resp.correlation_id, request_id);
4462            assert_eq!(resp.client_id, client_id);
4463            assert_eq!(resp.instrument_id, instrument_id);
4464            assert_eq!(resp.data.len(), 1);
4465
4466            let tick = &resp.data[0];
4467            assert_eq!(tick.instrument_id, instrument_id);
4468            assert_eq!(tick.price, Price::new(100.25, price_precision));
4469            assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4470            assert_eq!(tick.trade_id.to_string(), "trade-1");
4471
4472            use nautilus_model::enums::AggressorSide;
4473            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4474        } else {
4475            panic!("did not receive trades response in time");
4476        }
4477
4478        // Verify symbol conversion (strip -PERP suffix) and limit propagation.
4479        let last_ticker = state.last_ticker.lock().await.clone();
4480        assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4481
4482        let last_limit = *state.last_limit.lock().await;
4483        assert_eq!(last_limit, Some(Some(100)));
4484    }
4485
4486    #[tokio::test]
4487    async fn test_request_trades_empty_response_and_no_limit() {
4488        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4489        set_data_event_sender(sender);
4490
4491        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4492
4493        let state = TradesTestState {
4494            response: Arc::new(trades_response),
4495            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4496            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4497        };
4498
4499        let addr = start_trades_test_server(state.clone()).await;
4500        let base_url = format!("http://{addr}");
4501
4502        let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4503        let config = DydxDataClientConfig {
4504            base_url_http: Some(base_url),
4505            is_testnet: true,
4506            ..Default::default()
4507        };
4508
4509        let http_client = DydxHttpClient::new(
4510            config.base_url_http.clone(),
4511            config.http_timeout_secs,
4512            config.http_proxy_url.clone(),
4513            config.is_testnet,
4514            None,
4515        )
4516        .unwrap();
4517
4518        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4519
4520        let instrument = create_test_instrument_any();
4521        let instrument_id = instrument.id();
4522        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4523        client.instruments.insert(symbol_key, instrument);
4524
4525        let request_id = UUID4::new();
4526
4527        let request = RequestTrades::new(
4528            instrument_id,
4529            None,
4530            None,
4531            None, // No limit
4532            Some(client_id),
4533            request_id,
4534            get_atomic_clock_realtime().get_time_ns(),
4535            None,
4536        );
4537
4538        assert!(client.request_trades(&request).is_ok());
4539
4540        let timeout = tokio::time::Duration::from_secs(1);
4541        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4542            tokio::time::timeout(timeout, rx.recv()).await
4543        {
4544            assert_eq!(resp.correlation_id, request_id);
4545            assert_eq!(resp.client_id, client_id);
4546            assert_eq!(resp.instrument_id, instrument_id);
4547            assert!(resp.data.is_empty());
4548        } else {
4549            panic!("did not receive trades response in time");
4550        }
4551
4552        // Verify that no `limit` query parameter was sent.
4553        let last_limit = *state.last_limit.lock().await;
4554        assert_eq!(last_limit, Some(None));
4555    }
4556
4557    #[tokio::test]
4558    async fn test_request_trades_timestamp_filtering() {
4559        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4560        set_data_event_sender(sender);
4561
4562        let now = Utc::now();
4563        let trade_before = crate::http::models::Trade {
4564            id: "before".to_string(),
4565            side: OrderSide::Buy,
4566            size: dec!(1.0),
4567            price: dec!(100.0),
4568            created_at: now - chrono::Duration::seconds(60),
4569            created_at_height: 1,
4570            trade_type: crate::common::enums::DydxTradeType::Limit,
4571        };
4572        let trade_inside = crate::http::models::Trade {
4573            id: "inside".to_string(),
4574            side: OrderSide::Sell,
4575            size: dec!(2.0),
4576            price: dec!(101.0),
4577            created_at: now,
4578            created_at_height: 2,
4579            trade_type: crate::common::enums::DydxTradeType::Limit,
4580        };
4581        let trade_after = crate::http::models::Trade {
4582            id: "after".to_string(),
4583            side: OrderSide::Buy,
4584            size: dec!(3.0),
4585            price: dec!(102.0),
4586            created_at: now + chrono::Duration::seconds(60),
4587            created_at_height: 3,
4588            trade_type: crate::common::enums::DydxTradeType::Limit,
4589        };
4590
4591        let trades_response = crate::http::models::TradesResponse {
4592            trades: vec![trade_before, trade_inside.clone(), trade_after],
4593        };
4594
4595        let state = TradesTestState {
4596            response: Arc::new(trades_response),
4597            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4598            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4599        };
4600
4601        let addr = start_trades_test_server(state).await;
4602        let base_url = format!("http://{addr}");
4603
4604        let client_id = ClientId::from("DYDX-TRADES-FILTER");
4605        let config = DydxDataClientConfig {
4606            base_url_http: Some(base_url),
4607            is_testnet: true,
4608            ..Default::default()
4609        };
4610
4611        let http_client = DydxHttpClient::new(
4612            config.base_url_http.clone(),
4613            config.http_timeout_secs,
4614            config.http_proxy_url.clone(),
4615            config.is_testnet,
4616            None,
4617        )
4618        .unwrap();
4619
4620        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4621
4622        let instrument = create_test_instrument_any();
4623        let instrument_id = instrument.id();
4624        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4625        client.instruments.insert(symbol_key, instrument);
4626
4627        let request_id = UUID4::new();
4628
4629        // Filter range includes only the "inside" trade.
4630        let start = Some(now - chrono::Duration::seconds(10));
4631        let end = Some(now + chrono::Duration::seconds(10));
4632
4633        let request = RequestTrades::new(
4634            instrument_id,
4635            start,
4636            end,
4637            None,
4638            Some(client_id),
4639            request_id,
4640            get_atomic_clock_realtime().get_time_ns(),
4641            None,
4642        );
4643
4644        assert!(client.request_trades(&request).is_ok());
4645
4646        let timeout = tokio::time::Duration::from_secs(1);
4647        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4648            tokio::time::timeout(timeout, rx.recv()).await
4649        {
4650            assert_eq!(resp.correlation_id, request_id);
4651            assert_eq!(resp.client_id, client_id);
4652            assert_eq!(resp.instrument_id, instrument_id);
4653            assert_eq!(resp.data.len(), 1);
4654
4655            let tick = &resp.data[0];
4656            assert_eq!(tick.trade_id.to_string(), "inside");
4657            assert_eq!(tick.price.as_decimal(), dec!(101.0));
4658        } else {
4659            panic!("did not receive trades response in time");
4660        }
4661    }
4662
4663    #[tokio::test]
4664    async fn test_request_trades_correlation_id_matching() {
4665        // Test correlation_id matching in response
4666        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4667        set_data_event_sender(sender);
4668
4669        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4670
4671        let state = TradesTestState {
4672            response: Arc::new(trades_response),
4673            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4674            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4675        };
4676
4677        let addr = start_trades_test_server(state).await;
4678        let base_url = format!("http://{addr}");
4679
4680        let client_id = ClientId::from("DYDX-TRADES-CORR");
4681        let config = DydxDataClientConfig {
4682            base_url_http: Some(base_url),
4683            is_testnet: true,
4684            ..Default::default()
4685        };
4686
4687        let http_client = DydxHttpClient::new(
4688            config.base_url_http.clone(),
4689            config.http_timeout_secs,
4690            config.http_proxy_url.clone(),
4691            config.is_testnet,
4692            None,
4693        )
4694        .unwrap();
4695
4696        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4697
4698        let instrument = create_test_instrument_any();
4699        let instrument_id = instrument.id();
4700        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4701        client.instruments.insert(symbol_key, instrument);
4702
4703        let request_id = UUID4::new();
4704        let request = RequestTrades::new(
4705            instrument_id,
4706            None,
4707            None,
4708            None,
4709            Some(client_id),
4710            request_id,
4711            get_atomic_clock_realtime().get_time_ns(),
4712            None,
4713        );
4714
4715        assert!(client.request_trades(&request).is_ok());
4716
4717        let timeout = tokio::time::Duration::from_millis(500);
4718        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4719            tokio::time::timeout(timeout, rx.recv()).await
4720        {
4721            assert_eq!(resp.correlation_id, request_id);
4722        }
4723    }
4724
4725    #[tokio::test]
4726    async fn test_request_trades_response_format() {
4727        // Verify TradesResponse format
4728        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4729        set_data_event_sender(sender);
4730
4731        let created_at = Utc::now();
4732        let http_trade = crate::http::models::Trade {
4733            id: "format-test".to_string(),
4734            side: OrderSide::Sell,
4735            size: dec!(5.0),
4736            price: dec!(200.0),
4737            created_at,
4738            created_at_height: 100,
4739            trade_type: crate::common::enums::DydxTradeType::Limit,
4740        };
4741
4742        let trades_response = crate::http::models::TradesResponse {
4743            trades: vec![http_trade],
4744        };
4745
4746        let state = TradesTestState {
4747            response: Arc::new(trades_response),
4748            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4749            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4750        };
4751
4752        let addr = start_trades_test_server(state).await;
4753        let base_url = format!("http://{addr}");
4754
4755        let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4756        let config = DydxDataClientConfig {
4757            base_url_http: Some(base_url),
4758            is_testnet: true,
4759            ..Default::default()
4760        };
4761
4762        let http_client = DydxHttpClient::new(
4763            config.base_url_http.clone(),
4764            config.http_timeout_secs,
4765            config.http_proxy_url.clone(),
4766            config.is_testnet,
4767            None,
4768        )
4769        .unwrap();
4770
4771        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4772
4773        let instrument = create_test_instrument_any();
4774        let instrument_id = instrument.id();
4775        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4776        client.instruments.insert(symbol_key, instrument);
4777
4778        let request = RequestTrades::new(
4779            instrument_id,
4780            None,
4781            None,
4782            None,
4783            Some(client_id),
4784            UUID4::new(),
4785            get_atomic_clock_realtime().get_time_ns(),
4786            None,
4787        );
4788
4789        assert!(client.request_trades(&request).is_ok());
4790
4791        let timeout = tokio::time::Duration::from_millis(500);
4792        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4793            tokio::time::timeout(timeout, rx.recv()).await
4794        {
4795            // Verify response structure
4796            assert_eq!(resp.client_id, client_id);
4797            assert_eq!(resp.instrument_id, instrument_id);
4798            assert!(resp.data.len() == 1);
4799            assert!(resp.ts_init > 0);
4800
4801            // Verify trade tick structure
4802            let tick = &resp.data[0];
4803            assert_eq!(tick.instrument_id, instrument_id);
4804            assert!(tick.ts_event > 0);
4805            assert!(tick.ts_init > 0);
4806        }
4807    }
4808
4809    #[tokio::test]
4810    async fn test_request_trades_no_instrument_in_cache() {
4811        // Test empty response when instrument not in cache
4812        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4813        set_data_event_sender(sender);
4814
4815        let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4816        let config = DydxDataClientConfig::default();
4817        let http_client = DydxHttpClient::default();
4818        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4819
4820        // Don't add instrument to cache
4821        let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4822
4823        let request = RequestTrades::new(
4824            instrument_id,
4825            None,
4826            None,
4827            None,
4828            Some(client_id),
4829            UUID4::new(),
4830            get_atomic_clock_realtime().get_time_ns(),
4831            None,
4832        );
4833
4834        assert!(client.request_trades(&request).is_ok());
4835
4836        // Should receive empty response when instrument not found
4837        let timeout = tokio::time::Duration::from_millis(500);
4838        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4839            tokio::time::timeout(timeout, rx.recv()).await
4840        {
4841            assert!(resp.data.is_empty());
4842        }
4843    }
4844
4845    #[tokio::test]
4846    async fn test_request_trades_limit_parameter() {
4847        // Test limit parameter handling
4848        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4849        set_data_event_sender(sender);
4850
4851        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4852
4853        let state = TradesTestState {
4854            response: Arc::new(trades_response),
4855            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4856            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4857        };
4858
4859        let addr = start_trades_test_server(state.clone()).await;
4860        let base_url = format!("http://{addr}");
4861
4862        let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4863        let config = DydxDataClientConfig {
4864            base_url_http: Some(base_url),
4865            is_testnet: true,
4866            ..Default::default()
4867        };
4868
4869        let http_client = DydxHttpClient::new(
4870            config.base_url_http.clone(),
4871            config.http_timeout_secs,
4872            config.http_proxy_url.clone(),
4873            config.is_testnet,
4874            None,
4875        )
4876        .unwrap();
4877
4878        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4879
4880        let instrument = create_test_instrument_any();
4881        let instrument_id = instrument.id();
4882        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4883        client.instruments.insert(symbol_key, instrument);
4884
4885        // Test with limit
4886        let limit = std::num::NonZeroUsize::new(500).unwrap();
4887        let request = RequestTrades::new(
4888            instrument_id,
4889            None,
4890            None,
4891            Some(limit),
4892            Some(client_id),
4893            UUID4::new(),
4894            get_atomic_clock_realtime().get_time_ns(),
4895            None,
4896        );
4897
4898        assert!(client.request_trades(&request).is_ok());
4899        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
4900
4901        // Verify limit was passed to HTTP client
4902        let last_limit = *state.last_limit.lock().await;
4903        assert_eq!(last_limit, Some(Some(500)));
4904    }
4905
4906    #[rstest]
4907    fn test_request_trades_symbol_conversion() {
4908        // Test symbol conversion (strip -PERP suffix)
4909        setup_test_env();
4910
4911        let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4912        let config = DydxDataClientConfig::default();
4913        let http_client = DydxHttpClient::default();
4914        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4915
4916        // Verify symbol format for various instruments
4917        let test_cases = vec![
4918            ("BTC-USD-PERP.DYDX", "BTC-USD"),
4919            ("ETH-USD-PERP.DYDX", "ETH-USD"),
4920            ("SOL-USD-PERP.DYDX", "SOL-USD"),
4921        ];
4922
4923        for (instrument_id_str, expected_ticker) in test_cases {
4924            let instrument_id = InstrumentId::from(instrument_id_str);
4925            let ticker = instrument_id
4926                .symbol
4927                .as_str()
4928                .trim_end_matches("-PERP")
4929                .to_string();
4930            assert_eq!(ticker, expected_ticker);
4931        }
4932    }
4933
4934    #[tokio::test]
4935    async fn test_http_404_handling() {
4936        // Test HTTP 404 handling (instrument not found)
4937        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4938        set_data_event_sender(sender);
4939
4940        let client_id = ClientId::from("DYDX-404");
4941        let config = DydxDataClientConfig {
4942            base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4943            http_timeout_secs: Some(1),
4944            ..Default::default()
4945        };
4946
4947        let http_client = DydxHttpClient::new(
4948            config.base_url_http.clone(),
4949            config.http_timeout_secs,
4950            config.http_proxy_url.clone(),
4951            config.is_testnet,
4952            None,
4953        )
4954        .unwrap();
4955
4956        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4957
4958        let request = RequestInstruments::new(
4959            None,
4960            None,
4961            Some(client_id),
4962            Some(*DYDX_VENUE),
4963            UUID4::new(),
4964            get_atomic_clock_realtime().get_time_ns(),
4965            None,
4966        );
4967
4968        assert!(client.request_instruments(&request).is_ok());
4969
4970        // Should receive empty response on 404
4971        let timeout = tokio::time::Duration::from_secs(2);
4972        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4973            tokio::time::timeout(timeout, rx.recv()).await
4974        {
4975            assert!(resp.data.is_empty(), "Expected empty response on 404");
4976        }
4977    }
4978
4979    #[tokio::test]
4980    async fn test_http_500_handling() {
4981        // Test HTTP 500 handling (internal server error)
4982        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4983        set_data_event_sender(sender);
4984
4985        let client_id = ClientId::from("DYDX-500");
4986        let config = DydxDataClientConfig {
4987            base_url_http: Some("http://httpstat.us/500".to_string()),
4988            http_timeout_secs: Some(2),
4989            ..Default::default()
4990        };
4991
4992        let http_client = DydxHttpClient::new(
4993            config.base_url_http.clone(),
4994            config.http_timeout_secs,
4995            config.http_proxy_url.clone(),
4996            config.is_testnet,
4997            None,
4998        )
4999        .unwrap();
5000
5001        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5002
5003        let instrument = create_test_instrument_any();
5004        let instrument_id = instrument.id();
5005        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5006        client.instruments.insert(symbol_key, instrument);
5007
5008        let request = RequestTrades::new(
5009            instrument_id,
5010            None,
5011            None,
5012            None,
5013            Some(client_id),
5014            UUID4::new(),
5015            get_atomic_clock_realtime().get_time_ns(),
5016            None,
5017        );
5018
5019        assert!(client.request_trades(&request).is_ok());
5020
5021        // Should receive empty response on 500 error
5022        let timeout = tokio::time::Duration::from_secs(3);
5023        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5024            tokio::time::timeout(timeout, rx.recv()).await
5025        {
5026            assert!(resp.data.is_empty(), "Expected empty response on 500");
5027        }
5028    }
5029
5030    #[tokio::test]
5031    async fn test_network_timeout_handling() {
5032        // Test network timeout scenarios
5033        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5034        set_data_event_sender(sender);
5035
5036        let client_id = ClientId::from("DYDX-TIMEOUT");
5037        let config = DydxDataClientConfig {
5038            base_url_http: Some("http://10.255.255.1:81".to_string()), // Non-routable IP
5039            http_timeout_secs: Some(1),                                // Very short timeout
5040            ..Default::default()
5041        };
5042
5043        let http_client = DydxHttpClient::new(
5044            config.base_url_http.clone(),
5045            config.http_timeout_secs,
5046            config.http_proxy_url.clone(),
5047            config.is_testnet,
5048            None,
5049        )
5050        .unwrap();
5051
5052        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5053
5054        let request = RequestInstruments::new(
5055            None,
5056            None,
5057            Some(client_id),
5058            Some(*DYDX_VENUE),
5059            UUID4::new(),
5060            get_atomic_clock_realtime().get_time_ns(),
5061            None,
5062        );
5063
5064        assert!(client.request_instruments(&request).is_ok());
5065
5066        // Should timeout and return empty response
5067        let timeout = tokio::time::Duration::from_secs(3);
5068        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5069            tokio::time::timeout(timeout, rx.recv()).await
5070        {
5071            assert!(resp.data.is_empty(), "Expected empty response on timeout");
5072        }
5073    }
5074
5075    #[tokio::test]
5076    async fn test_connection_refused_handling() {
5077        // Test connection refused errors
5078        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5079        set_data_event_sender(sender);
5080
5081        let client_id = ClientId::from("DYDX-REFUSED");
5082        let config = DydxDataClientConfig {
5083            base_url_http: Some("http://localhost:9999".to_string()), // Port unlikely to be open
5084            http_timeout_secs: Some(1),
5085            ..Default::default()
5086        };
5087
5088        let http_client = DydxHttpClient::new(
5089            config.base_url_http.clone(),
5090            config.http_timeout_secs,
5091            config.http_proxy_url.clone(),
5092            config.is_testnet,
5093            None,
5094        )
5095        .unwrap();
5096
5097        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5098
5099        let instrument = create_test_instrument_any();
5100        let instrument_id = instrument.id();
5101        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5102        client.instruments.insert(symbol_key, instrument);
5103
5104        let request = RequestInstrument::new(
5105            instrument_id,
5106            None,
5107            None,
5108            Some(client_id),
5109            UUID4::new(),
5110            get_atomic_clock_realtime().get_time_ns(),
5111            None,
5112        );
5113
5114        assert!(client.request_instrument(&request).is_ok());
5115
5116        // Should handle connection refused gracefully
5117        let timeout = tokio::time::Duration::from_secs(2);
5118        let result = tokio::time::timeout(timeout, rx.recv()).await;
5119
5120        // May not receive response if connection fails before spawning handler
5121        // This is acceptable - the important part is no panic
5122        match result {
5123            Ok(Some(DataEvent::Response(_))) => {
5124                // Response received (empty data expected)
5125            }
5126            Ok(None) | Err(_) => {
5127                // No response or timeout - acceptable for connection refused
5128            }
5129            _ => {}
5130        }
5131    }
5132
5133    #[tokio::test]
5134    async fn test_dns_resolution_failure_handling() {
5135        // Test DNS resolution failures
5136        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5137        set_data_event_sender(sender);
5138
5139        let client_id = ClientId::from("DYDX-DNS");
5140        let config = DydxDataClientConfig {
5141            base_url_http: Some(
5142                "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5143            ),
5144            http_timeout_secs: Some(2),
5145            ..Default::default()
5146        };
5147
5148        let http_client = DydxHttpClient::new(
5149            config.base_url_http.clone(),
5150            config.http_timeout_secs,
5151            config.http_proxy_url.clone(),
5152            config.is_testnet,
5153            None,
5154        )
5155        .unwrap();
5156
5157        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5158
5159        let request = RequestInstruments::new(
5160            None,
5161            None,
5162            Some(client_id),
5163            Some(*DYDX_VENUE),
5164            UUID4::new(),
5165            get_atomic_clock_realtime().get_time_ns(),
5166            None,
5167        );
5168
5169        assert!(client.request_instruments(&request).is_ok());
5170
5171        // Should handle DNS failure gracefully with empty response
5172        let timeout = tokio::time::Duration::from_secs(3);
5173        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5174            tokio::time::timeout(timeout, rx.recv()).await
5175        {
5176            assert!(
5177                resp.data.is_empty(),
5178                "Expected empty response on DNS failure"
5179            );
5180        }
5181    }
5182
5183    #[tokio::test]
5184    async fn test_http_502_503_handling() {
5185        // Test HTTP 502/503 handling (bad gateway/service unavailable)
5186        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5187        set_data_event_sender(sender);
5188
5189        let client_id = ClientId::from("DYDX-503");
5190        let config = DydxDataClientConfig {
5191            base_url_http: Some("http://httpstat.us/503".to_string()),
5192            http_timeout_secs: Some(2),
5193            ..Default::default()
5194        };
5195
5196        let http_client = DydxHttpClient::new(
5197            config.base_url_http.clone(),
5198            config.http_timeout_secs,
5199            config.http_proxy_url.clone(),
5200            config.is_testnet,
5201            None,
5202        )
5203        .unwrap();
5204
5205        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5206
5207        let request = RequestInstruments::new(
5208            None,
5209            None,
5210            Some(client_id),
5211            Some(*DYDX_VENUE),
5212            UUID4::new(),
5213            get_atomic_clock_realtime().get_time_ns(),
5214            None,
5215        );
5216
5217        assert!(client.request_instruments(&request).is_ok());
5218
5219        // Should handle 503 gracefully with empty response
5220        let timeout = tokio::time::Duration::from_secs(3);
5221        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5222            tokio::time::timeout(timeout, rx.recv()).await
5223        {
5224            assert!(resp.data.is_empty(), "Expected empty response on 503");
5225        }
5226    }
5227
5228    #[tokio::test]
5229    async fn test_http_429_rate_limit_handling() {
5230        // Test HTTP 429 handling (rate limit exceeded)
5231        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5232        set_data_event_sender(sender);
5233
5234        let client_id = ClientId::from("DYDX-429");
5235        let config = DydxDataClientConfig {
5236            base_url_http: Some("http://httpstat.us/429".to_string()),
5237            http_timeout_secs: Some(2),
5238            ..Default::default()
5239        };
5240
5241        let http_client = DydxHttpClient::new(
5242            config.base_url_http.clone(),
5243            config.http_timeout_secs,
5244            config.http_proxy_url.clone(),
5245            config.is_testnet,
5246            None,
5247        )
5248        .unwrap();
5249
5250        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5251
5252        let instrument = create_test_instrument_any();
5253        let instrument_id = instrument.id();
5254        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5255        client.instruments.insert(symbol_key, instrument);
5256
5257        let request = RequestTrades::new(
5258            instrument_id,
5259            None,
5260            None,
5261            None,
5262            Some(client_id),
5263            UUID4::new(),
5264            get_atomic_clock_realtime().get_time_ns(),
5265            None,
5266        );
5267
5268        assert!(client.request_trades(&request).is_ok());
5269
5270        // Should handle rate limit with empty response
5271        let timeout = tokio::time::Duration::from_secs(3);
5272        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5273            tokio::time::timeout(timeout, rx.recv()).await
5274        {
5275            assert!(
5276                resp.data.is_empty(),
5277                "Expected empty response on rate limit"
5278            );
5279        }
5280    }
5281
5282    #[tokio::test]
5283    async fn test_error_handling_does_not_panic() {
5284        // Test that error scenarios don't cause panics
5285        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5286        set_data_event_sender(sender);
5287
5288        let client_id = ClientId::from("DYDX-NO-PANIC");
5289        let config = DydxDataClientConfig {
5290            base_url_http: Some("http://invalid".to_string()),
5291            ..Default::default()
5292        };
5293
5294        let http_client = DydxHttpClient::new(
5295            config.base_url_http.clone(),
5296            config.http_timeout_secs,
5297            config.http_proxy_url.clone(),
5298            config.is_testnet,
5299            None,
5300        )
5301        .unwrap();
5302
5303        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5304
5305        // All these should return Ok() without panicking
5306        let request_instruments = RequestInstruments::new(
5307            None,
5308            None,
5309            Some(client_id),
5310            Some(*DYDX_VENUE),
5311            UUID4::new(),
5312            get_atomic_clock_realtime().get_time_ns(),
5313            None,
5314        );
5315        assert!(client.request_instruments(&request_instruments).is_ok());
5316
5317        let instrument_id = InstrumentId::from("INVALID.DYDX");
5318        let request_instrument = RequestInstrument::new(
5319            instrument_id,
5320            None,
5321            None,
5322            Some(client_id),
5323            UUID4::new(),
5324            get_atomic_clock_realtime().get_time_ns(),
5325            None,
5326        );
5327        assert!(client.request_instrument(&request_instrument).is_ok());
5328
5329        let request_trades = RequestTrades::new(
5330            instrument_id,
5331            None,
5332            None,
5333            None,
5334            Some(client_id),
5335            UUID4::new(),
5336            get_atomic_clock_realtime().get_time_ns(),
5337            None,
5338        );
5339        assert!(client.request_trades(&request_trades).is_ok());
5340    }
5341
5342    #[tokio::test]
5343    async fn test_malformed_json_response() {
5344        // Test handling of malformed JSON from API
5345        use axum::{Router, routing::get};
5346
5347        #[derive(Clone)]
5348        struct MalformedState;
5349
5350        async fn malformed_markets_handler() -> String {
5351            // Invalid JSON - missing closing brace
5352            r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5353        }
5354
5355        let app = Router::new()
5356            .route("/v4/markets", get(malformed_markets_handler))
5357            .with_state(MalformedState);
5358
5359        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5360        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5361        let port = listener.local_addr().unwrap().port();
5362
5363        tokio::spawn(async move {
5364            axum::serve(listener, app).await.unwrap();
5365        });
5366
5367        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5368
5369        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5370        set_data_event_sender(sender);
5371
5372        let client_id = ClientId::from("DYDX-MALFORMED");
5373        let config = DydxDataClientConfig {
5374            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5375            http_timeout_secs: Some(2),
5376            ..Default::default()
5377        };
5378
5379        let http_client = DydxHttpClient::new(
5380            config.base_url_http.clone(),
5381            config.http_timeout_secs,
5382            config.http_proxy_url.clone(),
5383            config.is_testnet,
5384            None,
5385        )
5386        .unwrap();
5387
5388        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5389
5390        let request = RequestInstruments::new(
5391            None,
5392            None,
5393            Some(client_id),
5394            Some(*DYDX_VENUE),
5395            UUID4::new(),
5396            get_atomic_clock_realtime().get_time_ns(),
5397            None,
5398        );
5399
5400        assert!(client.request_instruments(&request).is_ok());
5401
5402        // Should handle malformed JSON gracefully with empty response
5403        let timeout = tokio::time::Duration::from_secs(3);
5404        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5405            tokio::time::timeout(timeout, rx.recv()).await
5406        {
5407            assert!(
5408                resp.data.is_empty(),
5409                "Expected empty response on malformed JSON"
5410            );
5411        }
5412    }
5413
5414    #[tokio::test]
5415    async fn test_missing_required_fields_in_response() {
5416        // Test handling when API response missing required fields
5417        use axum::{Json, Router, routing::get};
5418        use serde_json::{Value, json};
5419
5420        #[derive(Clone)]
5421        struct MissingFieldsState;
5422
5423        async fn missing_fields_handler() -> Json<Value> {
5424            // Missing critical fields like "ticker", "stepSize", etc.
5425            Json(json!({
5426                "markets": {
5427                    "BTC-USD": {
5428                        // Missing "ticker"
5429                        "status": "ACTIVE",
5430                        "baseAsset": "BTC",
5431                        "quoteAsset": "USD",
5432                        // Missing "stepSize", "tickSize", "minOrderSize"
5433                    }
5434                }
5435            }))
5436        }
5437
5438        let app = Router::new()
5439            .route("/v4/markets", get(missing_fields_handler))
5440            .with_state(MissingFieldsState);
5441
5442        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5443        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5444        let port = listener.local_addr().unwrap().port();
5445
5446        tokio::spawn(async move {
5447            axum::serve(listener, app).await.unwrap();
5448        });
5449
5450        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5451
5452        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5453        set_data_event_sender(sender);
5454
5455        let client_id = ClientId::from("DYDX-MISSING");
5456        let config = DydxDataClientConfig {
5457            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5458            http_timeout_secs: Some(2),
5459            ..Default::default()
5460        };
5461
5462        let http_client = DydxHttpClient::new(
5463            config.base_url_http.clone(),
5464            config.http_timeout_secs,
5465            config.http_proxy_url.clone(),
5466            config.is_testnet,
5467            None,
5468        )
5469        .unwrap();
5470
5471        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5472
5473        let request = RequestInstruments::new(
5474            None,
5475            None,
5476            Some(client_id),
5477            Some(*DYDX_VENUE),
5478            UUID4::new(),
5479            get_atomic_clock_realtime().get_time_ns(),
5480            None,
5481        );
5482
5483        assert!(client.request_instruments(&request).is_ok());
5484
5485        // Should handle missing fields gracefully (may skip instruments or return empty)
5486        let timeout = tokio::time::Duration::from_secs(3);
5487        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5488            tokio::time::timeout(timeout, rx.recv()).await
5489        {
5490            // Parse errors should result in empty or partial response
5491            // The important part is no panic
5492            assert!(resp.correlation_id == request.request_id);
5493        }
5494    }
5495
5496    #[tokio::test]
5497    async fn test_invalid_data_types_in_response() {
5498        // Test handling when API returns wrong data types
5499        use axum::{Json, Router, routing::get};
5500        use serde_json::{Value, json};
5501
5502        #[derive(Clone)]
5503        struct InvalidTypesState;
5504
5505        async fn invalid_types_handler() -> Json<Value> {
5506            // Wrong data types - strings instead of numbers, etc.
5507            Json(json!({
5508                "markets": {
5509                    "BTC-USD": {
5510                        "ticker": "BTC-USD",
5511                        "status": "ACTIVE",
5512                        "baseAsset": "BTC",
5513                        "quoteAsset": "USD",
5514                        "stepSize": "not_a_number",  // Should be numeric
5515                        "tickSize": true,  // Should be numeric
5516                        "minOrderSize": ["array"],  // Should be numeric
5517                        "market": 12345,  // Should be string
5518                    }
5519                }
5520            }))
5521        }
5522
5523        let app = Router::new()
5524            .route("/v4/markets", get(invalid_types_handler))
5525            .with_state(InvalidTypesState);
5526
5527        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5528        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5529        let port = listener.local_addr().unwrap().port();
5530
5531        tokio::spawn(async move {
5532            axum::serve(listener, app).await.unwrap();
5533        });
5534
5535        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5536
5537        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5538        set_data_event_sender(sender);
5539
5540        let client_id = ClientId::from("DYDX-TYPES");
5541        let config = DydxDataClientConfig {
5542            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5543            http_timeout_secs: Some(2),
5544            ..Default::default()
5545        };
5546
5547        let http_client = DydxHttpClient::new(
5548            config.base_url_http.clone(),
5549            config.http_timeout_secs,
5550            config.http_proxy_url.clone(),
5551            config.is_testnet,
5552            None,
5553        )
5554        .unwrap();
5555
5556        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5557
5558        let request = RequestInstruments::new(
5559            None,
5560            None,
5561            Some(client_id),
5562            Some(*DYDX_VENUE),
5563            UUID4::new(),
5564            get_atomic_clock_realtime().get_time_ns(),
5565            None,
5566        );
5567
5568        assert!(client.request_instruments(&request).is_ok());
5569
5570        // Should handle type errors gracefully
5571        let timeout = tokio::time::Duration::from_secs(3);
5572        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5573            tokio::time::timeout(timeout, rx.recv()).await
5574        {
5575            // Type mismatch should result in parse failure and empty/partial response
5576            assert!(resp.correlation_id == request.request_id);
5577        }
5578    }
5579
5580    #[tokio::test]
5581    async fn test_unexpected_response_structure() {
5582        // Test handling when API response has completely unexpected structure
5583        use axum::{Json, Router, routing::get};
5584        use serde_json::{Value, json};
5585
5586        #[derive(Clone)]
5587        struct UnexpectedState;
5588
5589        async fn unexpected_structure_handler() -> Json<Value> {
5590            // Completely different structure than expected
5591            Json(json!({
5592                "error": "Something went wrong",
5593                "code": 500,
5594                "data": null,
5595                "unexpected_field": {
5596                    "nested": {
5597                        "deeply": [1, 2, 3]
5598                    }
5599                }
5600            }))
5601        }
5602
5603        let app = Router::new()
5604            .route("/v4/markets", get(unexpected_structure_handler))
5605            .with_state(UnexpectedState);
5606
5607        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5608        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5609        let port = listener.local_addr().unwrap().port();
5610
5611        tokio::spawn(async move {
5612            axum::serve(listener, app).await.unwrap();
5613        });
5614
5615        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5616
5617        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5618        set_data_event_sender(sender);
5619
5620        let client_id = ClientId::from("DYDX-STRUCT");
5621        let config = DydxDataClientConfig {
5622            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5623            http_timeout_secs: Some(2),
5624            ..Default::default()
5625        };
5626
5627        let http_client = DydxHttpClient::new(
5628            config.base_url_http.clone(),
5629            config.http_timeout_secs,
5630            config.http_proxy_url.clone(),
5631            config.is_testnet,
5632            None,
5633        )
5634        .unwrap();
5635
5636        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5637
5638        let request = RequestInstruments::new(
5639            None,
5640            None,
5641            Some(client_id),
5642            Some(*DYDX_VENUE),
5643            UUID4::new(),
5644            get_atomic_clock_realtime().get_time_ns(),
5645            None,
5646        );
5647
5648        assert!(client.request_instruments(&request).is_ok());
5649
5650        // Should handle unexpected structure gracefully with empty response
5651        let timeout = tokio::time::Duration::from_secs(3);
5652        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5653            tokio::time::timeout(timeout, rx.recv()).await
5654        {
5655            assert!(
5656                resp.data.is_empty(),
5657                "Expected empty response on unexpected structure"
5658            );
5659            assert!(resp.correlation_id == request.request_id);
5660        }
5661    }
5662
5663    #[tokio::test]
5664    async fn test_empty_markets_object_in_response() {
5665        // Test handling when markets object is empty (valid JSON but no data)
5666        use axum::{Json, Router, routing::get};
5667        use serde_json::{Value, json};
5668
5669        #[derive(Clone)]
5670        struct EmptyMarketsState;
5671
5672        async fn empty_markets_handler() -> Json<Value> {
5673            Json(json!({
5674                "markets": {}
5675            }))
5676        }
5677
5678        let app = Router::new()
5679            .route("/v4/markets", get(empty_markets_handler))
5680            .with_state(EmptyMarketsState);
5681
5682        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5683        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5684        let port = listener.local_addr().unwrap().port();
5685
5686        tokio::spawn(async move {
5687            axum::serve(listener, app).await.unwrap();
5688        });
5689
5690        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5691
5692        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5693        set_data_event_sender(sender);
5694
5695        let client_id = ClientId::from("DYDX-EMPTY");
5696        let config = DydxDataClientConfig {
5697            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5698            http_timeout_secs: Some(2),
5699            ..Default::default()
5700        };
5701
5702        let http_client = DydxHttpClient::new(
5703            config.base_url_http.clone(),
5704            config.http_timeout_secs,
5705            config.http_proxy_url.clone(),
5706            config.is_testnet,
5707            None,
5708        )
5709        .unwrap();
5710
5711        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5712
5713        let request = RequestInstruments::new(
5714            None,
5715            None,
5716            Some(client_id),
5717            Some(*DYDX_VENUE),
5718            UUID4::new(),
5719            get_atomic_clock_realtime().get_time_ns(),
5720            None,
5721        );
5722
5723        assert!(client.request_instruments(&request).is_ok());
5724
5725        // Should handle empty markets gracefully
5726        let timeout = tokio::time::Duration::from_secs(3);
5727        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5728            tokio::time::timeout(timeout, rx.recv()).await
5729        {
5730            assert!(
5731                resp.data.is_empty(),
5732                "Expected empty response for empty markets"
5733            );
5734            assert!(resp.correlation_id == request.request_id);
5735        }
5736    }
5737
5738    #[tokio::test]
5739    async fn test_null_values_in_response() {
5740        // Test handling of null values in critical fields
5741        use axum::{Json, Router, routing::get};
5742        use serde_json::{Value, json};
5743
5744        #[derive(Clone)]
5745        struct NullValuesState;
5746
5747        async fn null_values_handler() -> Json<Value> {
5748            Json(json!({
5749                "markets": {
5750                    "BTC-USD": {
5751                        "ticker": null,
5752                        "status": "ACTIVE",
5753                        "baseAsset": null,
5754                        "quoteAsset": "USD",
5755                        "stepSize": null,
5756                    }
5757                }
5758            }))
5759        }
5760
5761        let app = Router::new()
5762            .route("/v4/markets", get(null_values_handler))
5763            .with_state(NullValuesState);
5764
5765        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5766        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5767        let port = listener.local_addr().unwrap().port();
5768
5769        tokio::spawn(async move {
5770            axum::serve(listener, app).await.unwrap();
5771        });
5772
5773        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5774
5775        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5776        set_data_event_sender(sender);
5777
5778        let client_id = ClientId::from("DYDX-NULL");
5779        let config = DydxDataClientConfig {
5780            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5781            http_timeout_secs: Some(2),
5782            ..Default::default()
5783        };
5784
5785        let http_client = DydxHttpClient::new(
5786            config.base_url_http.clone(),
5787            config.http_timeout_secs,
5788            config.http_proxy_url.clone(),
5789            config.is_testnet,
5790            None,
5791        )
5792        .unwrap();
5793
5794        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5795
5796        let request = RequestInstruments::new(
5797            None,
5798            None,
5799            Some(client_id),
5800            Some(*DYDX_VENUE),
5801            UUID4::new(),
5802            get_atomic_clock_realtime().get_time_ns(),
5803            None,
5804        );
5805
5806        assert!(client.request_instruments(&request).is_ok());
5807
5808        // Should handle null values gracefully
5809        let timeout = tokio::time::Duration::from_secs(3);
5810        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5811            tokio::time::timeout(timeout, rx.recv()).await
5812        {
5813            // Null values should cause parse failures and result in empty/partial response
5814            assert!(resp.correlation_id == request.request_id);
5815        }
5816    }
5817
5818    #[tokio::test]
5819    async fn test_invalid_instrument_id_format() {
5820        // Test handling of non-existent instrument (valid ID format but doesn't exist)
5821        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5822        set_data_event_sender(sender);
5823
5824        let client_id = ClientId::from("DYDX-INVALID-ID");
5825        let config = DydxDataClientConfig::default();
5826
5827        let http_client = DydxHttpClient::new(
5828            config.base_url_http.clone(),
5829            config.http_timeout_secs,
5830            config.http_proxy_url.clone(),
5831            config.is_testnet,
5832            None,
5833        )
5834        .unwrap();
5835
5836        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5837
5838        // Valid format but non-existent instrument
5839        let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5840
5841        let request = RequestInstrument::new(
5842            non_existent_id,
5843            None,
5844            None,
5845            Some(client_id),
5846            UUID4::new(),
5847            get_atomic_clock_realtime().get_time_ns(),
5848            None,
5849        );
5850
5851        assert!(client.request_instrument(&request).is_ok());
5852
5853        // Should handle non-existent instrument gracefully
5854        let timeout = tokio::time::Duration::from_secs(2);
5855        let result = tokio::time::timeout(timeout, rx.recv()).await;
5856
5857        // Either no response or empty response is acceptable for non-existent instrument
5858        match result {
5859            Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5860                // Empty response acceptable
5861            }
5862            Ok(None) | Err(_) => {
5863                // Timeout or no response also acceptable
5864            }
5865            _ => {}
5866        }
5867    }
5868
5869    #[tokio::test]
5870    async fn test_invalid_date_range_end_before_start() {
5871        // Test handling when end date is before start date
5872        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5873        set_data_event_sender(sender);
5874
5875        let client_id = ClientId::from("DYDX-DATE-RANGE");
5876        let config = DydxDataClientConfig::default();
5877
5878        let http_client = DydxHttpClient::new(
5879            config.base_url_http.clone(),
5880            config.http_timeout_secs,
5881            config.http_proxy_url.clone(),
5882            config.is_testnet,
5883            None,
5884        )
5885        .unwrap();
5886
5887        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5888
5889        let instrument = create_test_instrument_any();
5890        let instrument_id = instrument.id();
5891        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5892        client.instruments.insert(symbol_key, instrument);
5893
5894        // Invalid date range: end is before start
5895        let start = Utc::now();
5896        let end = start - chrono::Duration::hours(24); // End is 24 hours before start
5897
5898        let request = RequestTrades::new(
5899            instrument_id,
5900            Some(start),
5901            Some(end),
5902            None,
5903            Some(client_id),
5904            UUID4::new(),
5905            get_atomic_clock_realtime().get_time_ns(),
5906            None,
5907        );
5908
5909        assert!(client.request_trades(&request).is_ok());
5910
5911        // Should handle invalid range gracefully - may return empty or no response
5912        let timeout = tokio::time::Duration::from_secs(2);
5913        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5914            tokio::time::timeout(timeout, rx.recv()).await
5915        {
5916            // Empty response expected for invalid date range
5917            assert!(resp.correlation_id == request.request_id);
5918        }
5919    }
5920
5921    #[tokio::test]
5922    async fn test_negative_limit_value() {
5923        // Test handling of limit edge cases
5924        // Note: Rust's NonZeroUsize prevents negative/zero values at type level
5925        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5926        set_data_event_sender(sender);
5927
5928        let client_id = ClientId::from("DYDX-NEG-LIMIT");
5929        let config = DydxDataClientConfig::default();
5930
5931        let http_client = DydxHttpClient::new(
5932            config.base_url_http.clone(),
5933            config.http_timeout_secs,
5934            config.http_proxy_url.clone(),
5935            config.is_testnet,
5936            None,
5937        )
5938        .unwrap();
5939
5940        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5941
5942        let instrument = create_test_instrument_any();
5943        let instrument_id = instrument.id();
5944        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5945        client.instruments.insert(symbol_key, instrument);
5946
5947        // Minimum valid limit (1)
5948        let request = RequestTrades::new(
5949            instrument_id,
5950            None,
5951            None,
5952            std::num::NonZeroUsize::new(1), // Minimum valid value
5953            Some(client_id),
5954            UUID4::new(),
5955            get_atomic_clock_realtime().get_time_ns(),
5956            None,
5957        );
5958
5959        // Should not panic with minimum limit
5960        assert!(client.request_trades(&request).is_ok());
5961    }
5962
5963    #[tokio::test]
5964    async fn test_zero_limit_value() {
5965        // Test handling of no limit (None = use API default)
5966        // Note: NonZeroUsize type prevents actual zero, so None represents "no limit"
5967        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5968        set_data_event_sender(sender);
5969
5970        let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5971        let config = DydxDataClientConfig::default();
5972
5973        let http_client = DydxHttpClient::new(
5974            config.base_url_http.clone(),
5975            config.http_timeout_secs,
5976            config.http_proxy_url.clone(),
5977            config.is_testnet,
5978            None,
5979        )
5980        .unwrap();
5981
5982        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5983
5984        let instrument = create_test_instrument_any();
5985        let instrument_id = instrument.id();
5986        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5987        client.instruments.insert(symbol_key, instrument);
5988
5989        let request = RequestTrades::new(
5990            instrument_id,
5991            None,
5992            None,
5993            None, // No limit specified (None = use default)
5994            Some(client_id),
5995            UUID4::new(),
5996            get_atomic_clock_realtime().get_time_ns(),
5997            None,
5998        );
5999
6000        assert!(client.request_trades(&request).is_ok());
6001
6002        // Should handle None limit gracefully (uses API default)
6003        let timeout = tokio::time::Duration::from_secs(2);
6004        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6005            tokio::time::timeout(timeout, rx.recv()).await
6006        {
6007            assert!(resp.correlation_id == request.request_id);
6008        }
6009    }
6010
6011    #[tokio::test]
6012    async fn test_very_large_limit_value() {
6013        // Test handling of extremely large limit values (boundary testing)
6014        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6015        set_data_event_sender(sender);
6016
6017        let client_id = ClientId::from("DYDX-LARGE-LIMIT");
6018        let config = DydxDataClientConfig::default();
6019
6020        let http_client = DydxHttpClient::new(
6021            config.base_url_http.clone(),
6022            config.http_timeout_secs,
6023            config.http_proxy_url.clone(),
6024            config.is_testnet,
6025            None,
6026        )
6027        .unwrap();
6028
6029        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6030
6031        let instrument = create_test_instrument_any();
6032        let instrument_id = instrument.id();
6033        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6034        client.instruments.insert(symbol_key, instrument);
6035
6036        let request = RequestTrades::new(
6037            instrument_id,
6038            None,
6039            None,
6040            std::num::NonZeroUsize::new(1_000_000), // Very large limit
6041            Some(client_id),
6042            UUID4::new(),
6043            get_atomic_clock_realtime().get_time_ns(),
6044            None,
6045        );
6046
6047        // Should not panic with very large limit
6048        assert!(client.request_trades(&request).is_ok());
6049
6050        // Should handle large limit gracefully
6051        let timeout = tokio::time::Duration::from_secs(2);
6052        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6053            tokio::time::timeout(timeout, rx.recv()).await
6054        {
6055            assert!(resp.correlation_id == request.request_id);
6056        }
6057    }
6058
6059    #[tokio::test]
6060    async fn test_none_limit_uses_default() {
6061        // Test that None limit falls back to default behavior
6062        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6063        set_data_event_sender(sender);
6064
6065        let client_id = ClientId::from("DYDX-NONE-LIMIT");
6066        let config = DydxDataClientConfig::default();
6067
6068        let http_client = DydxHttpClient::new(
6069            config.base_url_http.clone(),
6070            config.http_timeout_secs,
6071            config.http_proxy_url.clone(),
6072            config.is_testnet,
6073            None,
6074        )
6075        .unwrap();
6076
6077        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6078
6079        let instrument = create_test_instrument_any();
6080        let instrument_id = instrument.id();
6081        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6082        client.instruments.insert(symbol_key, instrument);
6083
6084        let request = RequestTrades::new(
6085            instrument_id,
6086            None,
6087            None,
6088            None, // No limit specified
6089            Some(client_id),
6090            UUID4::new(),
6091            get_atomic_clock_realtime().get_time_ns(),
6092            None,
6093        );
6094
6095        // Should work fine with None limit (uses API default)
6096        assert!(client.request_trades(&request).is_ok());
6097
6098        let timeout = tokio::time::Duration::from_secs(2);
6099        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6100            tokio::time::timeout(timeout, rx.recv()).await
6101        {
6102            assert!(resp.correlation_id == request.request_id);
6103        }
6104    }
6105
6106    #[tokio::test]
6107    async fn test_validation_does_not_panic() {
6108        // Test that various validation edge cases don't cause panics
6109        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6110        set_data_event_sender(sender);
6111
6112        let client_id = ClientId::from("DYDX-VALIDATION");
6113        let config = DydxDataClientConfig::default();
6114
6115        let http_client = DydxHttpClient::new(
6116            config.base_url_http.clone(),
6117            config.http_timeout_secs,
6118            config.http_proxy_url.clone(),
6119            config.is_testnet,
6120            None,
6121        )
6122        .unwrap();
6123
6124        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6125
6126        let instrument = create_test_instrument_any();
6127        let instrument_id = instrument.id();
6128        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6129        client.instruments.insert(symbol_key, instrument);
6130
6131        // Test 1: Invalid instrument ID
6132        let invalid_id = InstrumentId::from("INVALID.WRONG");
6133        let req1 = RequestInstrument::new(
6134            invalid_id,
6135            None,
6136            None,
6137            Some(client_id),
6138            UUID4::new(),
6139            get_atomic_clock_realtime().get_time_ns(),
6140            None,
6141        );
6142        assert!(client.request_instrument(&req1).is_ok());
6143
6144        // Test 2: Invalid date range
6145        let start = Utc::now();
6146        let end = start - chrono::Duration::hours(1);
6147        let req2 = RequestTrades::new(
6148            instrument_id,
6149            Some(start),
6150            Some(end),
6151            None,
6152            Some(client_id),
6153            UUID4::new(),
6154            get_atomic_clock_realtime().get_time_ns(),
6155            None,
6156        );
6157        assert!(client.request_trades(&req2).is_ok());
6158
6159        // Test 3: Minimum limit (1)
6160        let req3 = RequestTrades::new(
6161            instrument_id,
6162            None,
6163            None,
6164            std::num::NonZeroUsize::new(1),
6165            Some(client_id),
6166            UUID4::new(),
6167            get_atomic_clock_realtime().get_time_ns(),
6168            None,
6169        );
6170        assert!(client.request_trades(&req3).is_ok());
6171
6172        // Test 4: Very large limit
6173        let req4 = RequestTrades::new(
6174            instrument_id,
6175            None,
6176            None,
6177            std::num::NonZeroUsize::new(usize::MAX),
6178            Some(client_id),
6179            UUID4::new(),
6180            get_atomic_clock_realtime().get_time_ns(),
6181            None,
6182        );
6183        assert!(client.request_trades(&req4).is_ok());
6184
6185        // All validation edge cases handled without panic
6186    }
6187
6188    #[tokio::test]
6189    async fn test_instruments_response_has_correct_venue() {
6190        // Verify InstrumentsResponse includes correct DYDX venue
6191        use axum::{Json, Router, routing::get};
6192        use serde_json::{Value, json};
6193
6194        #[derive(Clone)]
6195        struct VenueTestState;
6196
6197        async fn venue_handler() -> Json<Value> {
6198            Json(json!({
6199                "markets": {
6200                    "BTC-USD": {
6201                        "ticker": "BTC-USD",
6202                        "status": "ACTIVE",
6203                        "baseAsset": "BTC",
6204                        "quoteAsset": "USD",
6205                        "stepSize": "0.0001",
6206                        "tickSize": "1",
6207                        "indexPrice": "50000",
6208                        "oraclePrice": "50000",
6209                        "priceChange24H": "1000",
6210                        "nextFundingRate": "0.0001",
6211                        "nextFundingAt": "2024-01-01T00:00:00.000Z",
6212                        "minOrderSize": "0.001",
6213                        "type": "PERPETUAL",
6214                        "initialMarginFraction": "0.05",
6215                        "maintenanceMarginFraction": "0.03",
6216                        "volume24H": "1000000",
6217                        "trades24H": "10000",
6218                        "openInterest": "5000000",
6219                        "incrementalInitialMarginFraction": "0.01",
6220                        "incrementalPositionSize": "10",
6221                        "maxPositionSize": "1000",
6222                        "baselinePositionSize": "100",
6223                        "assetResolution": "10000000000"
6224                    }
6225                }
6226            }))
6227        }
6228
6229        let app = Router::new()
6230            .route("/v4/markets", get(venue_handler))
6231            .with_state(VenueTestState);
6232
6233        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
6234        let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
6235        let port = listener.local_addr().unwrap().port();
6236
6237        tokio::spawn(async move {
6238            axum::serve(listener, app).await.unwrap();
6239        });
6240
6241        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
6242
6243        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6244        set_data_event_sender(sender);
6245
6246        let client_id = ClientId::from("DYDX-VENUE-TEST");
6247        let config = DydxDataClientConfig {
6248            base_url_http: Some(format!("http://127.0.0.1:{port}")),
6249            http_timeout_secs: Some(2),
6250            ..Default::default()
6251        };
6252
6253        let http_client = DydxHttpClient::new(
6254            config.base_url_http.clone(),
6255            config.http_timeout_secs,
6256            config.http_proxy_url.clone(),
6257            config.is_testnet,
6258            None,
6259        )
6260        .unwrap();
6261
6262        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6263
6264        let request = RequestInstruments::new(
6265            None,
6266            None,
6267            Some(client_id),
6268            Some(*DYDX_VENUE),
6269            UUID4::new(),
6270            get_atomic_clock_realtime().get_time_ns(),
6271            None,
6272        );
6273
6274        assert!(client.request_instruments(&request).is_ok());
6275
6276        let timeout = tokio::time::Duration::from_secs(3);
6277        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6278            tokio::time::timeout(timeout, rx.recv()).await
6279        {
6280            // Verify venue is DYDX
6281            assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6282        }
6283    }
6284
6285    #[tokio::test]
6286    async fn test_instruments_response_contains_vec_instrument_any() {
6287        // Verify InstrumentsResponse contains Vec<InstrumentAny>
6288        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6289        set_data_event_sender(sender);
6290
6291        let client_id = ClientId::from("DYDX-VEC-TEST");
6292        let config = DydxDataClientConfig::default();
6293
6294        let http_client = DydxHttpClient::new(
6295            config.base_url_http.clone(),
6296            config.http_timeout_secs,
6297            config.http_proxy_url.clone(),
6298            config.is_testnet,
6299            None,
6300        )
6301        .unwrap();
6302
6303        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6304
6305        let request = RequestInstruments::new(
6306            None,
6307            None,
6308            Some(client_id),
6309            Some(*DYDX_VENUE),
6310            UUID4::new(),
6311            get_atomic_clock_realtime().get_time_ns(),
6312            None,
6313        );
6314
6315        assert!(client.request_instruments(&request).is_ok());
6316
6317        let timeout = tokio::time::Duration::from_secs(2);
6318        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6319            tokio::time::timeout(timeout, rx.recv()).await
6320        {
6321            // Verify data is Vec<InstrumentAny>
6322            assert!(
6323                resp.data.is_empty() || !resp.data.is_empty(),
6324                "data should be Vec<InstrumentAny>"
6325            );
6326        }
6327    }
6328
6329    #[tokio::test]
6330    async fn test_instruments_response_includes_correlation_id() {
6331        // Verify InstrumentsResponse includes correlation_id matching request_id
6332        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6333        set_data_event_sender(sender);
6334
6335        let client_id = ClientId::from("DYDX-CORR-TEST");
6336        let config = DydxDataClientConfig::default();
6337
6338        let http_client = DydxHttpClient::new(
6339            config.base_url_http.clone(),
6340            config.http_timeout_secs,
6341            config.http_proxy_url.clone(),
6342            config.is_testnet,
6343            None,
6344        )
6345        .unwrap();
6346
6347        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6348
6349        let request_id = UUID4::new();
6350        let request = RequestInstruments::new(
6351            None,
6352            None,
6353            Some(client_id),
6354            Some(*DYDX_VENUE),
6355            request_id,
6356            get_atomic_clock_realtime().get_time_ns(),
6357            None,
6358        );
6359
6360        assert!(client.request_instruments(&request).is_ok());
6361
6362        let timeout = tokio::time::Duration::from_secs(2);
6363        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6364            tokio::time::timeout(timeout, rx.recv()).await
6365        {
6366            // Verify correlation_id matches request_id
6367            assert_eq!(
6368                resp.correlation_id, request_id,
6369                "correlation_id should match request_id"
6370            );
6371        }
6372    }
6373
6374    #[tokio::test]
6375    async fn test_instruments_response_includes_client_id() {
6376        // Verify InstrumentsResponse includes client_id
6377        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6378        set_data_event_sender(sender);
6379
6380        let client_id = ClientId::from("DYDX-CLIENT-TEST");
6381        let config = DydxDataClientConfig::default();
6382
6383        let http_client = DydxHttpClient::new(
6384            config.base_url_http.clone(),
6385            config.http_timeout_secs,
6386            config.http_proxy_url.clone(),
6387            config.is_testnet,
6388            None,
6389        )
6390        .unwrap();
6391
6392        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6393
6394        let request = RequestInstruments::new(
6395            None,
6396            None,
6397            Some(client_id),
6398            Some(*DYDX_VENUE),
6399            UUID4::new(),
6400            get_atomic_clock_realtime().get_time_ns(),
6401            None,
6402        );
6403
6404        assert!(client.request_instruments(&request).is_ok());
6405
6406        let timeout = tokio::time::Duration::from_secs(2);
6407        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6408            tokio::time::timeout(timeout, rx.recv()).await
6409        {
6410            // Verify client_id is included
6411            assert_eq!(
6412                resp.client_id, client_id,
6413                "client_id should be included in response"
6414            );
6415        }
6416    }
6417
6418    #[tokio::test]
6419    async fn test_instruments_response_includes_timestamps() {
6420        // Verify InstrumentsResponse includes start, end, and ts_init timestamps
6421        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6422        set_data_event_sender(sender);
6423
6424        let client_id = ClientId::from("DYDX-TS-TEST");
6425        let config = DydxDataClientConfig::default();
6426
6427        let http_client = DydxHttpClient::new(
6428            config.base_url_http.clone(),
6429            config.http_timeout_secs,
6430            config.http_proxy_url.clone(),
6431            config.is_testnet,
6432            None,
6433        )
6434        .unwrap();
6435
6436        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6437
6438        let start = Some(Utc::now() - chrono::Duration::days(1));
6439        let end = Some(Utc::now());
6440        let ts_init = get_atomic_clock_realtime().get_time_ns();
6441
6442        let request = RequestInstruments::new(
6443            start,
6444            end,
6445            Some(client_id),
6446            Some(*DYDX_VENUE),
6447            UUID4::new(),
6448            ts_init,
6449            None,
6450        );
6451
6452        assert!(client.request_instruments(&request).is_ok());
6453
6454        let timeout = tokio::time::Duration::from_secs(2);
6455        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6456            tokio::time::timeout(timeout, rx.recv()).await
6457        {
6458            // Verify timestamps are included
6459            assert!(
6460                resp.start.is_some() || resp.start.is_none(),
6461                "start timestamp field exists"
6462            );
6463            assert!(
6464                resp.end.is_some() || resp.end.is_none(),
6465                "end timestamp field exists"
6466            );
6467            assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6468        }
6469    }
6470
6471    #[tokio::test]
6472    async fn test_instruments_response_includes_params_when_provided() {
6473        // Verify InstrumentsResponse includes params when provided in request
6474        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6475        set_data_event_sender(sender);
6476
6477        let client_id = ClientId::from("DYDX-PARAMS-TEST");
6478        let config = DydxDataClientConfig::default();
6479
6480        let http_client = DydxHttpClient::new(
6481            config.base_url_http.clone(),
6482            config.http_timeout_secs,
6483            config.http_proxy_url.clone(),
6484            config.is_testnet,
6485            None,
6486        )
6487        .unwrap();
6488
6489        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6490
6491        // Since we can't easily create IndexMap in tests without importing,
6492        // just verify the params field exists by passing None
6493        let request = RequestInstruments::new(
6494            None,
6495            None,
6496            Some(client_id),
6497            Some(*DYDX_VENUE),
6498            UUID4::new(),
6499            get_atomic_clock_realtime().get_time_ns(),
6500            None, // params
6501        );
6502
6503        assert!(client.request_instruments(&request).is_ok());
6504
6505        let timeout = tokio::time::Duration::from_secs(2);
6506        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6507            tokio::time::timeout(timeout, rx.recv()).await
6508        {
6509            // Verify params field exists (structure validation)
6510            let _params = resp.params;
6511        }
6512    }
6513
6514    #[tokio::test]
6515    async fn test_instruments_response_params_none_when_not_provided() {
6516        // Verify InstrumentsResponse params is None when not provided
6517        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6518        set_data_event_sender(sender);
6519
6520        let client_id = ClientId::from("DYDX-NO-PARAMS");
6521        let config = DydxDataClientConfig::default();
6522
6523        let http_client = DydxHttpClient::new(
6524            config.base_url_http.clone(),
6525            config.http_timeout_secs,
6526            config.http_proxy_url.clone(),
6527            config.is_testnet,
6528            None,
6529        )
6530        .unwrap();
6531
6532        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6533
6534        let request = RequestInstruments::new(
6535            None,
6536            None,
6537            Some(client_id),
6538            Some(*DYDX_VENUE),
6539            UUID4::new(),
6540            get_atomic_clock_realtime().get_time_ns(),
6541            None, // No params
6542        );
6543
6544        assert!(client.request_instruments(&request).is_ok());
6545
6546        let timeout = tokio::time::Duration::from_secs(2);
6547        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6548            tokio::time::timeout(timeout, rx.recv()).await
6549        {
6550            // Verify params field exists and is None when not provided
6551            assert!(
6552                resp.params.is_none(),
6553                "params should be None when not provided"
6554            );
6555        }
6556    }
6557
6558    #[tokio::test]
6559    async fn test_instruments_response_complete_structure() {
6560        // Comprehensive test verifying all InstrumentsResponse fields
6561        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6562        set_data_event_sender(sender);
6563
6564        let client_id = ClientId::from("DYDX-FULL-TEST");
6565        let config = DydxDataClientConfig::default();
6566
6567        let http_client = DydxHttpClient::new(
6568            config.base_url_http.clone(),
6569            config.http_timeout_secs,
6570            config.http_proxy_url.clone(),
6571            config.is_testnet,
6572            None,
6573        )
6574        .unwrap();
6575
6576        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6577
6578        let request_id = UUID4::new();
6579        let start = Some(Utc::now() - chrono::Duration::hours(1));
6580        let end = Some(Utc::now());
6581        let ts_init = get_atomic_clock_realtime().get_time_ns();
6582
6583        let request = RequestInstruments::new(
6584            start,
6585            end,
6586            Some(client_id),
6587            Some(*DYDX_VENUE),
6588            request_id,
6589            ts_init,
6590            None,
6591        );
6592
6593        assert!(client.request_instruments(&request).is_ok());
6594
6595        let timeout = tokio::time::Duration::from_secs(2);
6596        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6597            tokio::time::timeout(timeout, rx.recv()).await
6598        {
6599            // Comprehensive validation of all fields
6600            assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6601            assert_eq!(
6602                resp.correlation_id, request_id,
6603                "correlation_id should match"
6604            );
6605            assert_eq!(resp.client_id, client_id, "client_id should match");
6606            assert!(resp.ts_init > 0, "ts_init should be set");
6607
6608            // data field exists (Vec<InstrumentAny>)
6609            let _data: Vec<InstrumentAny> = resp.data;
6610
6611            // Timestamp fields can be present or None
6612            let _start = resp.start;
6613            let _end = resp.end;
6614            let _params = resp.params;
6615        }
6616    }
6617
6618    #[tokio::test]
6619    async fn test_instrument_response_properly_boxed() {
6620        // Verify InstrumentResponse is properly boxed in DataResponse
6621        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6622        set_data_event_sender(sender);
6623
6624        let client_id = ClientId::from("DYDX-BOXED-TEST");
6625        let config = DydxDataClientConfig::default();
6626
6627        let http_client = DydxHttpClient::new(
6628            config.base_url_http.clone(),
6629            config.http_timeout_secs,
6630            config.http_proxy_url.clone(),
6631            config.is_testnet,
6632            None,
6633        )
6634        .unwrap();
6635
6636        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6637
6638        let instrument = create_test_instrument_any();
6639        let instrument_id = instrument.id();
6640        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6641        client.instruments.insert(symbol_key, instrument);
6642
6643        let request = RequestInstrument::new(
6644            instrument_id,
6645            None,
6646            None,
6647            Some(client_id),
6648            UUID4::new(),
6649            get_atomic_clock_realtime().get_time_ns(),
6650            None,
6651        );
6652
6653        assert!(client.request_instrument(&request).is_ok());
6654
6655        let timeout = tokio::time::Duration::from_secs(2);
6656        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6657            tokio::time::timeout(timeout, rx.recv()).await
6658        {
6659            // Verify it's boxed - we receive Box<InstrumentResponse>
6660            let _response: Box<InstrumentResponse> = boxed_resp;
6661            // Successfully matched boxed pattern
6662        }
6663    }
6664
6665    #[tokio::test]
6666    async fn test_instrument_response_contains_single_instrument() {
6667        // Verify InstrumentResponse contains single InstrumentAny
6668        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6669        set_data_event_sender(sender);
6670
6671        let client_id = ClientId::from("DYDX-SINGLE-TEST");
6672        let config = DydxDataClientConfig::default();
6673
6674        let http_client = DydxHttpClient::new(
6675            config.base_url_http.clone(),
6676            config.http_timeout_secs,
6677            config.http_proxy_url.clone(),
6678            config.is_testnet,
6679            None,
6680        )
6681        .unwrap();
6682
6683        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6684
6685        let instrument = create_test_instrument_any();
6686        let instrument_id = instrument.id();
6687        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6688        client.instruments.insert(symbol_key, instrument.clone());
6689
6690        let request = RequestInstrument::new(
6691            instrument_id,
6692            None,
6693            None,
6694            Some(client_id),
6695            UUID4::new(),
6696            get_atomic_clock_realtime().get_time_ns(),
6697            None,
6698        );
6699
6700        assert!(client.request_instrument(&request).is_ok());
6701
6702        let timeout = tokio::time::Duration::from_secs(2);
6703        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6704            tokio::time::timeout(timeout, rx.recv()).await
6705        {
6706            // Verify data contains single InstrumentAny
6707            let _instrument: InstrumentAny = resp.data;
6708            // Successfully matched InstrumentAny type
6709        }
6710    }
6711
6712    #[tokio::test]
6713    async fn test_instrument_response_has_correct_instrument_id() {
6714        // Verify InstrumentResponse has correct instrument_id
6715        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6716        set_data_event_sender(sender);
6717
6718        let client_id = ClientId::from("DYDX-ID-TEST");
6719        let config = DydxDataClientConfig::default();
6720
6721        let http_client = DydxHttpClient::new(
6722            config.base_url_http.clone(),
6723            config.http_timeout_secs,
6724            config.http_proxy_url.clone(),
6725            config.is_testnet,
6726            None,
6727        )
6728        .unwrap();
6729
6730        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6731
6732        let instrument = create_test_instrument_any();
6733        let instrument_id = instrument.id();
6734        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6735        client.instruments.insert(symbol_key, instrument);
6736
6737        let request = RequestInstrument::new(
6738            instrument_id,
6739            None,
6740            None,
6741            Some(client_id),
6742            UUID4::new(),
6743            get_atomic_clock_realtime().get_time_ns(),
6744            None,
6745        );
6746
6747        assert!(client.request_instrument(&request).is_ok());
6748
6749        let timeout = tokio::time::Duration::from_secs(2);
6750        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6751            tokio::time::timeout(timeout, rx.recv()).await
6752        {
6753            // Verify instrument_id matches request
6754            assert_eq!(
6755                resp.instrument_id, instrument_id,
6756                "instrument_id should match requested ID"
6757            );
6758        }
6759    }
6760
6761    #[tokio::test]
6762    async fn test_instrument_response_includes_metadata() {
6763        // Verify InstrumentResponse includes all metadata fields
6764        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6765        set_data_event_sender(sender);
6766
6767        let client_id = ClientId::from("DYDX-META-TEST");
6768        let config = DydxDataClientConfig::default();
6769
6770        let http_client = DydxHttpClient::new(
6771            config.base_url_http.clone(),
6772            config.http_timeout_secs,
6773            config.http_proxy_url.clone(),
6774            config.is_testnet,
6775            None,
6776        )
6777        .unwrap();
6778
6779        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6780
6781        let instrument = create_test_instrument_any();
6782        let instrument_id = instrument.id();
6783        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6784        client.instruments.insert(symbol_key, instrument);
6785
6786        let request_id = UUID4::new();
6787        let start = Some(Utc::now() - chrono::Duration::hours(1));
6788        let end = Some(Utc::now());
6789        let ts_init = get_atomic_clock_realtime().get_time_ns();
6790
6791        let request = RequestInstrument::new(
6792            instrument_id,
6793            start,
6794            end,
6795            Some(client_id),
6796            request_id,
6797            ts_init,
6798            None,
6799        );
6800
6801        assert!(client.request_instrument(&request).is_ok());
6802
6803        let timeout = tokio::time::Duration::from_secs(2);
6804        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6805            tokio::time::timeout(timeout, rx.recv()).await
6806        {
6807            // Verify all metadata fields
6808            assert_eq!(
6809                resp.correlation_id, request_id,
6810                "correlation_id should match"
6811            );
6812            assert_eq!(resp.client_id, client_id, "client_id should match");
6813            assert!(resp.ts_init > 0, "ts_init should be set");
6814
6815            // Timestamp fields exist (can be Some or None)
6816            let _start = resp.start;
6817            let _end = resp.end;
6818
6819            // Params field exists
6820            let _params = resp.params;
6821        }
6822    }
6823
6824    #[tokio::test]
6825    async fn test_instrument_response_matches_requested_instrument() {
6826        // Verify InstrumentResponse data matches the requested instrument exactly
6827        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6828        set_data_event_sender(sender);
6829
6830        let client_id = ClientId::from("DYDX-MATCH-TEST");
6831        let config = DydxDataClientConfig::default();
6832
6833        let http_client = DydxHttpClient::new(
6834            config.base_url_http.clone(),
6835            config.http_timeout_secs,
6836            config.http_proxy_url.clone(),
6837            config.is_testnet,
6838            None,
6839        )
6840        .unwrap();
6841
6842        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6843
6844        let instrument = create_test_instrument_any();
6845        let instrument_id = instrument.id();
6846        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6847        client.instruments.insert(symbol_key, instrument.clone());
6848
6849        let request = RequestInstrument::new(
6850            instrument_id,
6851            None,
6852            None,
6853            Some(client_id),
6854            UUID4::new(),
6855            get_atomic_clock_realtime().get_time_ns(),
6856            None,
6857        );
6858
6859        assert!(client.request_instrument(&request).is_ok());
6860
6861        let timeout = tokio::time::Duration::from_secs(2);
6862        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6863            tokio::time::timeout(timeout, rx.recv()).await
6864        {
6865            // Verify returned instrument matches requested instrument
6866            assert_eq!(
6867                resp.data.id(),
6868                instrument_id,
6869                "Returned instrument should match requested"
6870            );
6871            assert_eq!(
6872                resp.instrument_id, instrument_id,
6873                "instrument_id field should match"
6874            );
6875
6876            // Both should point to the same instrument
6877            assert_eq!(
6878                resp.data.id(),
6879                resp.instrument_id,
6880                "data.id() should match instrument_id field"
6881            );
6882        }
6883    }
6884
6885    #[tokio::test]
6886    async fn test_instrument_response_complete_structure() {
6887        // Comprehensive test verifying all InstrumentResponse fields
6888        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6889        set_data_event_sender(sender);
6890
6891        let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6892        let config = DydxDataClientConfig::default();
6893
6894        let http_client = DydxHttpClient::new(
6895            config.base_url_http.clone(),
6896            config.http_timeout_secs,
6897            config.http_proxy_url.clone(),
6898            config.is_testnet,
6899            None,
6900        )
6901        .unwrap();
6902
6903        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6904
6905        let instrument = create_test_instrument_any();
6906        let instrument_id = instrument.id();
6907        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6908        client.instruments.insert(symbol_key, instrument.clone());
6909
6910        let request_id = UUID4::new();
6911        let ts_init = get_atomic_clock_realtime().get_time_ns();
6912
6913        let request = RequestInstrument::new(
6914            instrument_id,
6915            None,
6916            None,
6917            Some(client_id),
6918            request_id,
6919            ts_init,
6920            None,
6921        );
6922
6923        assert!(client.request_instrument(&request).is_ok());
6924
6925        let timeout = tokio::time::Duration::from_secs(2);
6926        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6927            tokio::time::timeout(timeout, rx.recv()).await
6928        {
6929            // Comprehensive validation
6930            // 1. Boxed structure
6931            let _boxed: Box<InstrumentResponse> = resp.clone();
6932
6933            // 2. All required fields present
6934            assert_eq!(resp.correlation_id, request_id);
6935            assert_eq!(resp.client_id, client_id);
6936            assert_eq!(resp.instrument_id, instrument_id);
6937            assert!(resp.ts_init > 0);
6938
6939            // 3. Data field contains InstrumentAny
6940            let returned_instrument: InstrumentAny = resp.data;
6941            assert_eq!(returned_instrument.id(), instrument_id);
6942
6943            // 4. Optional fields exist
6944            let _start = resp.start;
6945            let _end = resp.end;
6946            let _params = resp.params;
6947        }
6948    }
6949
6950    #[tokio::test]
6951    async fn test_trades_response_contains_vec_trade_tick() {
6952        // Verify TradesResponse.data is Vec<TradeTick>
6953        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6954        set_data_event_sender(sender);
6955
6956        let created_at = Utc::now();
6957        let http_trades = vec![
6958            crate::http::models::Trade {
6959                id: "trade-1".to_string(),
6960                side: OrderSide::Buy,
6961                size: dec!(1.0),
6962                price: dec!(100.0),
6963                created_at,
6964                created_at_height: 100,
6965                trade_type: crate::common::enums::DydxTradeType::Limit,
6966            },
6967            crate::http::models::Trade {
6968                id: "trade-2".to_string(),
6969                side: OrderSide::Sell,
6970                size: dec!(2.0),
6971                price: dec!(101.0),
6972                created_at: created_at + chrono::Duration::seconds(1),
6973                created_at_height: 101,
6974                trade_type: crate::common::enums::DydxTradeType::Limit,
6975            },
6976        ];
6977
6978        let trades_response = crate::http::models::TradesResponse {
6979            trades: http_trades,
6980        };
6981
6982        let state = TradesTestState {
6983            response: Arc::new(trades_response),
6984            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6985            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6986        };
6987
6988        let addr = start_trades_test_server(state).await;
6989        let base_url = format!("http://{addr}");
6990
6991        let client_id = ClientId::from("DYDX-VEC-TEST");
6992        let config = DydxDataClientConfig {
6993            base_url_http: Some(base_url),
6994            is_testnet: true,
6995            ..Default::default()
6996        };
6997
6998        let http_client = DydxHttpClient::new(
6999            config.base_url_http.clone(),
7000            config.http_timeout_secs,
7001            config.http_proxy_url.clone(),
7002            config.is_testnet,
7003            None,
7004        )
7005        .unwrap();
7006
7007        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7008
7009        let instrument = create_test_instrument_any();
7010        let instrument_id = instrument.id();
7011        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7012        client.instruments.insert(symbol_key, instrument);
7013
7014        let request = RequestTrades::new(
7015            instrument_id,
7016            None,
7017            None,
7018            None,
7019            Some(client_id),
7020            UUID4::new(),
7021            get_atomic_clock_realtime().get_time_ns(),
7022            None,
7023        );
7024
7025        assert!(client.request_trades(&request).is_ok());
7026
7027        let timeout = tokio::time::Duration::from_millis(500);
7028        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7029            tokio::time::timeout(timeout, rx.recv()).await
7030        {
7031            // Verify data is Vec<TradeTick>
7032            let trade_ticks: Vec<TradeTick> = resp.data;
7033            assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7034
7035            // Each element is a TradeTick
7036            for tick in &trade_ticks {
7037                assert_eq!(tick.instrument_id, instrument_id);
7038            }
7039        }
7040    }
7041
7042    #[tokio::test]
7043    async fn test_trades_response_has_correct_instrument_id() {
7044        // Verify TradesResponse.instrument_id matches request
7045        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7046        set_data_event_sender(sender);
7047
7048        let created_at = Utc::now();
7049        let http_trade = crate::http::models::Trade {
7050            id: "instrument-id-test".to_string(),
7051            side: OrderSide::Buy,
7052            size: dec!(1.0),
7053            price: dec!(100.0),
7054            created_at,
7055            created_at_height: 100,
7056            trade_type: crate::common::enums::DydxTradeType::Limit,
7057        };
7058
7059        let trades_response = crate::http::models::TradesResponse {
7060            trades: vec![http_trade],
7061        };
7062
7063        let state = TradesTestState {
7064            response: Arc::new(trades_response),
7065            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7066            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7067        };
7068
7069        let addr = start_trades_test_server(state).await;
7070        let base_url = format!("http://{addr}");
7071
7072        let client_id = ClientId::from("DYDX-INSTID-TEST");
7073        let config = DydxDataClientConfig {
7074            base_url_http: Some(base_url),
7075            is_testnet: true,
7076            ..Default::default()
7077        };
7078
7079        let http_client = DydxHttpClient::new(
7080            config.base_url_http.clone(),
7081            config.http_timeout_secs,
7082            config.http_proxy_url.clone(),
7083            config.is_testnet,
7084            None,
7085        )
7086        .unwrap();
7087
7088        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7089
7090        let instrument = create_test_instrument_any();
7091        let instrument_id = instrument.id();
7092        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7093        client.instruments.insert(symbol_key, instrument);
7094
7095        let request = RequestTrades::new(
7096            instrument_id,
7097            None,
7098            None,
7099            None,
7100            Some(client_id),
7101            UUID4::new(),
7102            get_atomic_clock_realtime().get_time_ns(),
7103            None,
7104        );
7105
7106        assert!(client.request_trades(&request).is_ok());
7107
7108        let timeout = tokio::time::Duration::from_millis(500);
7109        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7110            tokio::time::timeout(timeout, rx.recv()).await
7111        {
7112            // Verify instrument_id field matches request
7113            assert_eq!(
7114                resp.instrument_id, instrument_id,
7115                "TradesResponse.instrument_id should match request"
7116            );
7117
7118            // Verify all trade ticks have the same instrument_id
7119            for tick in &resp.data {
7120                assert_eq!(
7121                    tick.instrument_id, instrument_id,
7122                    "Each TradeTick should have correct instrument_id"
7123                );
7124            }
7125        }
7126    }
7127
7128    #[tokio::test]
7129    async fn test_trades_response_properly_ordered_by_timestamp() {
7130        // Verify trades are ordered by timestamp (ascending)
7131        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7132        set_data_event_sender(sender);
7133
7134        let base_time = Utc::now();
7135        let http_trades = vec![
7136            crate::http::models::Trade {
7137                id: "trade-oldest".to_string(),
7138                side: OrderSide::Buy,
7139                size: dec!(1.0),
7140                price: dec!(100.0),
7141                created_at: base_time,
7142                created_at_height: 100,
7143                trade_type: crate::common::enums::DydxTradeType::Limit,
7144            },
7145            crate::http::models::Trade {
7146                id: "trade-middle".to_string(),
7147                side: OrderSide::Sell,
7148                size: dec!(2.0),
7149                price: dec!(101.0),
7150                created_at: base_time + chrono::Duration::seconds(1),
7151                created_at_height: 101,
7152                trade_type: crate::common::enums::DydxTradeType::Limit,
7153            },
7154            crate::http::models::Trade {
7155                id: "trade-newest".to_string(),
7156                side: OrderSide::Buy,
7157                size: dec!(3.0),
7158                price: dec!(102.0),
7159                created_at: base_time + chrono::Duration::seconds(2),
7160                created_at_height: 102,
7161                trade_type: crate::common::enums::DydxTradeType::Limit,
7162            },
7163        ];
7164
7165        let trades_response = crate::http::models::TradesResponse {
7166            trades: http_trades,
7167        };
7168
7169        let state = TradesTestState {
7170            response: Arc::new(trades_response),
7171            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7172            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7173        };
7174
7175        let addr = start_trades_test_server(state).await;
7176        let base_url = format!("http://{addr}");
7177
7178        let client_id = ClientId::from("DYDX-ORDER-TEST");
7179        let config = DydxDataClientConfig {
7180            base_url_http: Some(base_url),
7181            is_testnet: true,
7182            ..Default::default()
7183        };
7184
7185        let http_client = DydxHttpClient::new(
7186            config.base_url_http.clone(),
7187            config.http_timeout_secs,
7188            config.http_proxy_url.clone(),
7189            config.is_testnet,
7190            None,
7191        )
7192        .unwrap();
7193
7194        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7195
7196        let instrument = create_test_instrument_any();
7197        let instrument_id = instrument.id();
7198        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7199        client.instruments.insert(symbol_key, instrument);
7200
7201        let request = RequestTrades::new(
7202            instrument_id,
7203            None,
7204            None,
7205            None,
7206            Some(client_id),
7207            UUID4::new(),
7208            get_atomic_clock_realtime().get_time_ns(),
7209            None,
7210        );
7211
7212        assert!(client.request_trades(&request).is_ok());
7213
7214        let timeout = tokio::time::Duration::from_millis(500);
7215        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7216            tokio::time::timeout(timeout, rx.recv()).await
7217        {
7218            // Verify trades are ordered by timestamp
7219            let trade_ticks = resp.data;
7220            assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7221
7222            // Check ascending timestamp order
7223            for i in 1..trade_ticks.len() {
7224                assert!(
7225                    trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7226                    "Trades should be ordered by timestamp (ts_event) in ascending order"
7227                );
7228            }
7229
7230            // Verify specific ordering
7231            assert!(
7232                trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7233                "First trade should be before second"
7234            );
7235            assert!(
7236                trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7237                "Second trade should be before third"
7238            );
7239        }
7240    }
7241
7242    #[tokio::test]
7243    async fn test_trades_response_all_trade_tick_fields_populated() {
7244        // Verify all TradeTick fields are properly populated
7245        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7246        set_data_event_sender(sender);
7247
7248        let created_at = Utc::now();
7249        let http_trade = crate::http::models::Trade {
7250            id: "field-test".to_string(),
7251            side: OrderSide::Buy,
7252            size: dec!(5.5),
7253            price: dec!(12345.67),
7254            created_at,
7255            created_at_height: 999,
7256            trade_type: crate::common::enums::DydxTradeType::Limit,
7257        };
7258
7259        let trades_response = crate::http::models::TradesResponse {
7260            trades: vec![http_trade],
7261        };
7262
7263        let state = TradesTestState {
7264            response: Arc::new(trades_response),
7265            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7266            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7267        };
7268
7269        let addr = start_trades_test_server(state).await;
7270        let base_url = format!("http://{addr}");
7271
7272        let client_id = ClientId::from("DYDX-FIELDS-TEST");
7273        let config = DydxDataClientConfig {
7274            base_url_http: Some(base_url),
7275            is_testnet: true,
7276            ..Default::default()
7277        };
7278
7279        let http_client = DydxHttpClient::new(
7280            config.base_url_http.clone(),
7281            config.http_timeout_secs,
7282            config.http_proxy_url.clone(),
7283            config.is_testnet,
7284            None,
7285        )
7286        .unwrap();
7287
7288        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7289
7290        let instrument = create_test_instrument_any();
7291        let instrument_id = instrument.id();
7292        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7293        client.instruments.insert(symbol_key, instrument);
7294
7295        let request = RequestTrades::new(
7296            instrument_id,
7297            None,
7298            None,
7299            None,
7300            Some(client_id),
7301            UUID4::new(),
7302            get_atomic_clock_realtime().get_time_ns(),
7303            None,
7304        );
7305
7306        assert!(client.request_trades(&request).is_ok());
7307
7308        let timeout = tokio::time::Duration::from_millis(500);
7309        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7310            tokio::time::timeout(timeout, rx.recv()).await
7311        {
7312            assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7313            let tick = &resp.data[0];
7314
7315            // Verify all TradeTick fields are properly populated
7316            assert_eq!(
7317                tick.instrument_id, instrument_id,
7318                "instrument_id should be set"
7319            );
7320            assert!(tick.price.as_f64() > 0.0, "price should be positive");
7321            assert!(tick.size.as_f64() > 0.0, "size should be positive");
7322
7323            // Verify aggressor_side is set (Buy or Sell)
7324            match tick.aggressor_side {
7325                AggressorSide::Buyer | AggressorSide::Seller => {
7326                    // Valid aggressor side
7327                }
7328                AggressorSide::NoAggressor => {
7329                    panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7330                }
7331            }
7332
7333            // Verify trade_id is set
7334            assert!(
7335                !tick.trade_id.to_string().is_empty(),
7336                "trade_id should be set"
7337            );
7338
7339            // Verify timestamps are set and valid
7340            assert!(tick.ts_event > 0, "ts_event should be set");
7341            assert!(tick.ts_init > 0, "ts_init should be set");
7342            assert!(
7343                tick.ts_init >= tick.ts_event,
7344                "ts_init should be >= ts_event"
7345            );
7346        }
7347    }
7348
7349    #[tokio::test]
7350    async fn test_trades_response_includes_metadata() {
7351        // Verify TradesResponse includes all metadata fields
7352        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7353        set_data_event_sender(sender);
7354
7355        let created_at = Utc::now();
7356        let http_trade = crate::http::models::Trade {
7357            id: "metadata-test".to_string(),
7358            side: OrderSide::Buy,
7359            size: dec!(1.0),
7360            price: dec!(100.0),
7361            created_at,
7362            created_at_height: 100,
7363            trade_type: crate::common::enums::DydxTradeType::Limit,
7364        };
7365
7366        let trades_response = crate::http::models::TradesResponse {
7367            trades: vec![http_trade],
7368        };
7369
7370        let state = TradesTestState {
7371            response: Arc::new(trades_response),
7372            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7373            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7374        };
7375
7376        let addr = start_trades_test_server(state).await;
7377        let base_url = format!("http://{addr}");
7378
7379        let client_id = ClientId::from("DYDX-META-TEST");
7380        let config = DydxDataClientConfig {
7381            base_url_http: Some(base_url),
7382            is_testnet: true,
7383            ..Default::default()
7384        };
7385
7386        let http_client = DydxHttpClient::new(
7387            config.base_url_http.clone(),
7388            config.http_timeout_secs,
7389            config.http_proxy_url.clone(),
7390            config.is_testnet,
7391            None,
7392        )
7393        .unwrap();
7394
7395        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7396
7397        let instrument = create_test_instrument_any();
7398        let instrument_id = instrument.id();
7399        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7400        client.instruments.insert(symbol_key, instrument);
7401
7402        let request_id = UUID4::new();
7403        let request = RequestTrades::new(
7404            instrument_id,
7405            None,
7406            None,
7407            None,
7408            Some(client_id),
7409            request_id,
7410            get_atomic_clock_realtime().get_time_ns(),
7411            None,
7412        );
7413
7414        assert!(client.request_trades(&request).is_ok());
7415
7416        let timeout = tokio::time::Duration::from_millis(500);
7417        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7418            tokio::time::timeout(timeout, rx.recv()).await
7419        {
7420            // Verify metadata fields
7421            assert_eq!(
7422                resp.correlation_id, request_id,
7423                "correlation_id should match request"
7424            );
7425            assert_eq!(resp.client_id, client_id, "client_id should be set");
7426            assert_eq!(
7427                resp.instrument_id, instrument_id,
7428                "instrument_id should be set"
7429            );
7430            assert!(resp.ts_init > 0, "ts_init should be set");
7431
7432            let _start = resp.start;
7433            let _end = resp.end;
7434            let _params = resp.params;
7435        }
7436    }
7437
7438    #[tokio::test]
7439    async fn test_orderbook_cache_growth_with_many_instruments() {
7440        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7441        set_data_event_sender(sender);
7442
7443        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7444        let config = DydxDataClientConfig {
7445            base_url_http: Some(base_url),
7446            is_testnet: true,
7447            ..Default::default()
7448        };
7449
7450        let http_client = DydxHttpClient::new(
7451            config.base_url_http.clone(),
7452            config.http_timeout_secs,
7453            config.http_proxy_url.clone(),
7454            config.is_testnet,
7455            None,
7456        )
7457        .unwrap();
7458
7459        let client =
7460            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7461
7462        let initial_capacity = client.order_books.capacity();
7463
7464        for i in 0..100 {
7465            let symbol = format!("INSTRUMENT-{i}");
7466            let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7467            client.order_books.insert(
7468                instrument_id,
7469                OrderBook::new(instrument_id, BookType::L2_MBP),
7470            );
7471        }
7472
7473        assert_eq!(client.order_books.len(), 100);
7474        assert!(client.order_books.capacity() >= initial_capacity);
7475
7476        client.order_books.clear();
7477        assert_eq!(client.order_books.len(), 0);
7478    }
7479
7480    #[rstest]
7481    fn test_instrument_id_validation_rejects_invalid_formats() {
7482        // InstrumentId::from() validates format and panics on invalid input
7483        let test_cases = vec![
7484            ("", "Empty string missing separator"),
7485            ("INVALID", "No venue separator"),
7486            ("NO-VENUE", "No venue separator"),
7487            (".DYDX", "Empty symbol"),
7488            ("SYMBOL.", "Empty venue"),
7489        ];
7490
7491        for (invalid_id, description) in test_cases {
7492            let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7493            assert!(
7494                result.is_err(),
7495                "Expected {invalid_id} to panic: {description}"
7496            );
7497        }
7498    }
7499
7500    #[rstest]
7501    fn test_instrument_id_validation_accepts_valid_formats() {
7502        let valid_ids = vec![
7503            "BTC-USD-PERP.DYDX",
7504            "ETH-USD-PERP.DYDX",
7505            "SOL-USD.DYDX",
7506            "AVAX-USD-PERP.DYDX",
7507        ];
7508
7509        for valid_id in valid_ids {
7510            let instrument_id = InstrumentId::from(valid_id);
7511            assert!(
7512                !instrument_id.symbol.as_str().is_empty()
7513                    && !instrument_id.venue.as_str().is_empty(),
7514                "Expected {valid_id} to have non-empty symbol and venue"
7515            );
7516        }
7517    }
7518
7519    #[tokio::test]
7520    async fn test_request_bars_with_inverted_date_range() {
7521        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7522        set_data_event_sender(sender);
7523
7524        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7525        let config = DydxDataClientConfig {
7526            base_url_http: Some(base_url),
7527            is_testnet: true,
7528            ..Default::default()
7529        };
7530
7531        let http_client = DydxHttpClient::new(
7532            config.base_url_http.clone(),
7533            config.http_timeout_secs,
7534            config.http_proxy_url.clone(),
7535            config.is_testnet,
7536            None,
7537        )
7538        .unwrap();
7539
7540        let client =
7541            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7542
7543        let instrument = create_test_instrument_any();
7544        let instrument_id = instrument.id();
7545        client
7546            .instruments
7547            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7548
7549        let spec = BarSpecification {
7550            step: std::num::NonZeroUsize::new(1).unwrap(),
7551            aggregation: BarAggregation::Minute,
7552            price_type: PriceType::Last,
7553        };
7554        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7555
7556        let now = Utc::now();
7557        let start = Some(now);
7558        let end = Some(now - chrono::Duration::hours(1));
7559
7560        let request = RequestBars::new(
7561            bar_type,
7562            start,
7563            end,
7564            None,
7565            Some(ClientId::from("dydx_test")),
7566            UUID4::new(),
7567            get_atomic_clock_realtime().get_time_ns(),
7568            None,
7569        );
7570
7571        let result = client.request_bars(&request);
7572        assert!(result.is_ok());
7573    }
7574
7575    #[tokio::test]
7576    async fn test_request_bars_with_zero_limit() {
7577        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7578        set_data_event_sender(sender);
7579
7580        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7581        let config = DydxDataClientConfig {
7582            base_url_http: Some(base_url),
7583            is_testnet: true,
7584            ..Default::default()
7585        };
7586
7587        let http_client = DydxHttpClient::new(
7588            config.base_url_http.clone(),
7589            config.http_timeout_secs,
7590            config.http_proxy_url.clone(),
7591            config.is_testnet,
7592            None,
7593        )
7594        .unwrap();
7595
7596        let client =
7597            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7598
7599        let instrument = create_test_instrument_any();
7600        let instrument_id = instrument.id();
7601        client
7602            .instruments
7603            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7604
7605        let spec = BarSpecification {
7606            step: std::num::NonZeroUsize::new(1).unwrap(),
7607            aggregation: BarAggregation::Minute,
7608            price_type: PriceType::Last,
7609        };
7610        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7611
7612        let request = RequestBars::new(
7613            bar_type,
7614            None,
7615            None,
7616            Some(std::num::NonZeroUsize::new(1).unwrap()),
7617            Some(ClientId::from("dydx_test")),
7618            UUID4::new(),
7619            get_atomic_clock_realtime().get_time_ns(),
7620            None,
7621        );
7622
7623        let result = client.request_bars(&request);
7624        assert!(result.is_ok());
7625    }
7626
7627    #[tokio::test]
7628    async fn test_request_trades_with_excessive_limit() {
7629        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7630        set_data_event_sender(sender);
7631
7632        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7633        let config = DydxDataClientConfig {
7634            base_url_http: Some(base_url),
7635            is_testnet: true,
7636            ..Default::default()
7637        };
7638
7639        let http_client = DydxHttpClient::new(
7640            config.base_url_http.clone(),
7641            config.http_timeout_secs,
7642            config.http_proxy_url.clone(),
7643            config.is_testnet,
7644            None,
7645        )
7646        .unwrap();
7647
7648        let client =
7649            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7650
7651        let instrument = create_test_instrument_any();
7652        let instrument_id = instrument.id();
7653        client
7654            .instruments
7655            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7656
7657        let request = RequestTrades::new(
7658            instrument_id,
7659            None,
7660            None,
7661            Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7662            Some(ClientId::from("dydx_test")),
7663            UUID4::new(),
7664            get_atomic_clock_realtime().get_time_ns(),
7665            None,
7666        );
7667
7668        let result = client.request_trades(&request);
7669        assert!(result.is_ok());
7670    }
7671
7672    #[rstest]
7673    fn test_candle_topic_format() {
7674        let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7675        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7676        let resolution = "1MIN";
7677        let topic = format!("{ticker}/{resolution}");
7678
7679        assert_eq!(topic, "BTC-USD/1MIN");
7680        assert!(!topic.contains("-PERP"));
7681        assert!(!topic.contains(".DYDX"));
7682    }
7683}