nautilus_dydx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26    clients::DataClient,
27    live::{runner::get_data_event_sender, runtime::get_runtime},
28    messages::{
29        DataEvent, DataResponse,
30        data::{
31            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
32            RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
33            SubscribeInstrument, SubscribeInstruments, SubscribeQuotes, SubscribeTrades,
34            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeInstrument,
35            UnsubscribeInstruments, UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38};
39use nautilus_core::{
40    UnixNanos,
41    datetime::datetime_to_unix_nanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45    data::{
46        Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
47        OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick, TradeTick,
48    },
49    enums::{
50        AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
51        PriceType, RecordFlag,
52    },
53    identifiers::{ClientId, InstrumentId, TradeId, Venue},
54    instruments::{Instrument, InstrumentAny},
55    orderbook::OrderBook,
56    types::{Price, Quantity},
57};
58use rust_decimal::Decimal;
59use tokio::{task::JoinHandle, time::Duration};
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64    common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
65    config::DydxDataClientConfig,
66    http::client::DydxHttpClient,
67    types::DydxOraclePrice,
68    websocket::client::DydxWebSocketClient,
69};
70
71/// Groups WebSocket message handling dependencies.
72struct WsMessageContext {
73    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
75    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
76    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
77    ws_client: DydxWebSocketClient,
78    active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
79    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
80    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
81    incomplete_bars: Arc<DashMap<BarType, Bar>>,
82}
83
84/// dYdX data client for live market data streaming and historical data requests.
85///
86/// This client integrates with the Nautilus DataEngine to provide:
87/// - Real-time market data via WebSocket subscriptions
88/// - Historical data via REST API requests
89/// - Automatic instrument discovery and caching
90/// - Connection lifecycle management
91#[derive(Debug)]
92pub struct DydxDataClient {
93    /// The client ID for this data client.
94    client_id: ClientId,
95    /// Configuration for the data client.
96    config: DydxDataClientConfig,
97    /// HTTP client for REST API requests.
98    http_client: DydxHttpClient,
99    /// WebSocket client for real-time data streaming.
100    ws_client: DydxWebSocketClient,
101    /// Whether the client is currently connected.
102    is_connected: AtomicBool,
103    /// Cancellation token for async operations.
104    cancellation_token: CancellationToken,
105    /// Background task handles.
106    tasks: Vec<JoinHandle<()>>,
107    /// Channel sender for emitting data events to the DataEngine.
108    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
109    /// Cached instruments by symbol (shared with HTTP client via `Arc<DashMap<Ustr, InstrumentAny>>`).
110    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
111    /// High-resolution clock for timestamps.
112    clock: &'static AtomicTime,
113    /// Local order books maintained for generating quotes and resolving crosses.
114    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
115    /// Last quote tick per instrument (used for quote generation from book deltas).
116    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
117    /// Incomplete bars cache for bar aggregation.
118    /// Tracks bars not yet closed (ts_event > current_time), keyed by BarType.
119    /// Bars are emitted only when they close (ts_event <= current_time).
120    incomplete_bars: Arc<DashMap<BarType, Bar>>,
121    /// WebSocket topic to BarType mappings.
122    /// Maps dYdX candle topics (e.g., "BTC-USD/1MIN") to Nautilus BarType.
123    /// Used for subscription validation and reconnection recovery.
124    bar_type_mappings: Arc<DashMap<String, BarType>>,
125    /// Active orderbook subscriptions for periodic snapshot refresh.
126    active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
127    /// Active trade subscriptions for reconnection recovery.
128    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
129    /// Active bar/candle subscriptions for reconnection recovery (maps instrument+resolution to BarType).
130    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
131}
132
133impl DydxDataClient {
134    /// Maps Nautilus BarType spec to dYdX candle resolution string.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the bar aggregation or step is not supported by dYdX.
139    fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
140        match spec.step.get() {
141            1 => match spec.aggregation {
142                BarAggregation::Minute => Ok("1MIN"),
143                BarAggregation::Hour => Ok("1HOUR"),
144                BarAggregation::Day => Ok("1DAY"),
145                _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
146            },
147            5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
148            15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
149            30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
150            4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
151            step => anyhow::bail!(
152                "Unsupported bar step: {step} with aggregation {:?}",
153                spec.aggregation
154            ),
155        }
156    }
157
158    /// Creates a new [`DydxDataClient`] instance.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the client fails to initialize.
163    pub fn new(
164        client_id: ClientId,
165        config: DydxDataClientConfig,
166        http_client: DydxHttpClient,
167        ws_client: DydxWebSocketClient,
168    ) -> anyhow::Result<Self> {
169        let clock = get_atomic_clock_realtime();
170        let data_sender = get_data_event_sender();
171
172        // Clone the instruments cache before moving http_client
173        let instruments_cache = http_client.instruments().clone();
174
175        Ok(Self {
176            client_id,
177            config,
178            http_client,
179            ws_client,
180            is_connected: AtomicBool::new(false),
181            cancellation_token: CancellationToken::new(),
182            tasks: Vec::new(),
183            data_sender,
184            instruments: instruments_cache,
185            clock,
186            order_books: Arc::new(DashMap::new()),
187            last_quotes: Arc::new(DashMap::new()),
188            incomplete_bars: Arc::new(DashMap::new()),
189            bar_type_mappings: Arc::new(DashMap::new()),
190            active_orderbook_subs: Arc::new(DashMap::new()),
191            active_trade_subs: Arc::new(DashMap::new()),
192            active_bar_subs: Arc::new(DashMap::new()),
193        })
194    }
195
196    /// Returns the venue for this data client.
197    #[must_use]
198    pub fn venue(&self) -> Venue {
199        *DYDX_VENUE
200    }
201
202    /// Returns `true` when the client is connected.
203    #[must_use]
204    pub fn is_connected(&self) -> bool {
205        self.is_connected.load(Ordering::Relaxed)
206    }
207
208    /// Spawns an async WebSocket task with error handling.
209    ///
210    /// This helper ensures consistent error logging across all subscription methods.
211    fn spawn_ws<F>(&self, fut: F, context: &'static str)
212    where
213        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
214    {
215        get_runtime().spawn(async move {
216            if let Err(e) = fut.await {
217                log::error!("{context}: {e:?}");
218            }
219        });
220    }
221
222    /// Bootstrap instruments from the dYdX Indexer API.
223    ///
224    /// This method:
225    /// 1. Fetches all available instruments from the REST API
226    /// 2. Caches them in the HTTP client
227    /// 3. Caches them in the WebSocket client (if present)
228    /// 4. Populates the local instruments cache
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if:
233    /// - The HTTP request fails.
234    /// - Instrument parsing fails.
235    ///
236    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
237        log::info!("Bootstrapping dYdX instruments");
238
239        // Populates all HTTP cache layers (instruments, clob_pair_id, market_params)
240        self.http_client
241            .fetch_and_cache_instruments()
242            .await
243            .context("failed to load instruments from dYdX")?;
244
245        let instruments: Vec<InstrumentAny> = self
246            .http_client
247            .instruments()
248            .iter()
249            .map(|entry| entry.value().clone())
250            .collect();
251
252        if instruments.is_empty() {
253            log::warn!("No dYdX instruments were loaded");
254            return Ok(instruments);
255        }
256
257        log::info!("Loaded {} dYdX instruments", instruments.len());
258
259        self.ws_client.cache_instruments(instruments.clone());
260
261        Ok(instruments)
262    }
263}
264
265#[async_trait::async_trait(?Send)]
266impl DataClient for DydxDataClient {
267    fn client_id(&self) -> ClientId {
268        self.client_id
269    }
270
271    fn venue(&self) -> Option<Venue> {
272        Some(*DYDX_VENUE)
273    }
274
275    fn start(&mut self) -> anyhow::Result<()> {
276        log::info!(
277            "Starting dYdX data client: client_id={}, is_testnet={}",
278            self.client_id,
279            self.http_client.is_testnet()
280        );
281        Ok(())
282    }
283
284    fn stop(&mut self) -> anyhow::Result<()> {
285        log::info!("Stopping dYdX data client {}", self.client_id);
286        self.cancellation_token.cancel();
287        self.is_connected.store(false, Ordering::Relaxed);
288        Ok(())
289    }
290
291    fn reset(&mut self) -> anyhow::Result<()> {
292        log::debug!("Resetting dYdX data client {}", self.client_id);
293        self.is_connected.store(false, Ordering::Relaxed);
294        self.cancellation_token = CancellationToken::new();
295        self.tasks.clear();
296        Ok(())
297    }
298
299    fn dispose(&mut self) -> anyhow::Result<()> {
300        log::debug!("Disposing dYdX data client {}", self.client_id);
301        self.stop()
302    }
303
304    async fn connect(&mut self) -> anyhow::Result<()> {
305        if self.is_connected() {
306            return Ok(());
307        }
308
309        log::info!("Connecting dYdX data client");
310
311        // Bootstrap instruments first
312        self.bootstrap_instruments().await?;
313
314        // Connect WebSocket client and subscribe to market updates
315        self.ws_client
316            .connect()
317            .await
318            .context("failed to connect dYdX websocket")?;
319
320        self.ws_client
321            .subscribe_markets()
322            .await
323            .context("failed to subscribe to markets channel")?;
324
325        // Start message processing task (handler already converts to NautilusWsMessage)
326        if let Some(rx) = self.ws_client.take_receiver() {
327            let data_tx = self.data_sender.clone();
328            let instruments = self.instruments.clone();
329            let order_books = self.order_books.clone();
330            let last_quotes = self.last_quotes.clone();
331            let ws_client = self.ws_client.clone();
332            let active_orderbook_subs = self.active_orderbook_subs.clone();
333            let active_trade_subs = self.active_trade_subs.clone();
334            let active_bar_subs = self.active_bar_subs.clone();
335            let incomplete_bars = self.incomplete_bars.clone();
336
337            let ctx = WsMessageContext {
338                data_sender: data_tx,
339                instruments,
340                order_books,
341                last_quotes,
342                ws_client,
343                active_orderbook_subs,
344                active_trade_subs,
345                active_bar_subs,
346                incomplete_bars,
347            };
348
349            let task = get_runtime().spawn(async move {
350                let mut rx = rx;
351                while let Some(msg) = rx.recv().await {
352                    Self::handle_ws_message(msg, &ctx);
353                }
354            });
355            self.tasks.push(task);
356        } else {
357            log::warn!("No inbound WS receiver available after connect");
358        }
359
360        // Start orderbook snapshot refresh task
361        self.start_orderbook_refresh_task()?;
362
363        // Start instrument refresh task
364        self.start_instrument_refresh_task()?;
365
366        self.is_connected.store(true, Ordering::Relaxed);
367        log::info!("Connected dYdX data client");
368
369        Ok(())
370    }
371
372    async fn disconnect(&mut self) -> anyhow::Result<()> {
373        if !self.is_connected() {
374            return Ok(());
375        }
376
377        log::info!("Disconnecting dYdX data client");
378
379        self.ws_client
380            .disconnect()
381            .await
382            .context("failed to disconnect dYdX websocket")?;
383
384        self.is_connected.store(false, Ordering::Relaxed);
385        log::info!("Disconnected dYdX data client");
386
387        Ok(())
388    }
389
390    fn is_connected(&self) -> bool {
391        self.is_connected.load(Ordering::Relaxed)
392    }
393
394    fn is_disconnected(&self) -> bool {
395        !self.is_connected()
396    }
397
398    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
399        // dYdX uses a global markets channel which streams instruments implicitly.
400        // There is no dedicated instruments subscription, so this is a no-op to
401        // mirror the behaviour of `subscribe_instruments`.
402        log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
403        Ok(())
404    }
405
406    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
407        // dYdX does not support per-instrument instrument feed subscriptions.
408        // The markets channel always streams all instruments, so this is a no-op.
409        log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
410        Ok(())
411    }
412
413    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
414        // dYdX markets channel auto-subscribes to all instruments
415        // No explicit subscription needed - already handled in connect()
416        log::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
417        Ok(())
418    }
419
420    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
421        // dYdX markets channel auto-subscribes to all instruments
422        // Individual instrument subscriptions not supported - full feed only
423        log::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
424        Ok(())
425    }
426
427    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
428        let ws = self.ws_client.clone();
429        let instrument_id = cmd.instrument_id;
430
431        // Track active subscription for reconnection recovery
432        self.active_trade_subs.insert(instrument_id, ());
433
434        self.spawn_ws(
435            async move {
436                ws.subscribe_trades(instrument_id)
437                    .await
438                    .context("trade subscription")
439            },
440            "dYdX trade subscription",
441        );
442
443        Ok(())
444    }
445
446    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
447        if cmd.book_type != BookType::L2_MBP {
448            anyhow::bail!(
449                "dYdX only supports L2_MBP order book deltas, received {:?}",
450                cmd.book_type
451            );
452        }
453
454        // Ensure local order book exists for this instrument.
455        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
456
457        // Track active subscription for periodic refresh
458        self.active_orderbook_subs.insert(cmd.instrument_id, ());
459
460        let ws = self.ws_client.clone();
461        let instrument_id = cmd.instrument_id;
462
463        self.spawn_ws(
464            async move {
465                ws.subscribe_orderbook(instrument_id)
466                    .await
467                    .context("orderbook subscription")
468            },
469            "dYdX orderbook subscription",
470        );
471
472        Ok(())
473    }
474
475    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
476        // dYdX doesn't have a dedicated quotes channel
477        // Quotes are synthesized from order book deltas
478        log::debug!(
479            "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
480            cmd.instrument_id
481        );
482
483        // Simply delegate to book deltas subscription
484        let book_cmd = SubscribeBookDeltas {
485            client_id: cmd.client_id,
486            venue: cmd.venue,
487            instrument_id: cmd.instrument_id,
488            book_type: BookType::L2_MBP,
489            depth: None,
490            managed: false,
491            correlation_id: None,
492            params: None,
493            command_id: cmd.command_id,
494            ts_init: cmd.ts_init,
495        };
496
497        self.subscribe_book_deltas(&book_cmd)
498    }
499
500    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
501        let ws = self.ws_client.clone();
502        let instrument_id = cmd.bar_type.instrument_id();
503        let spec = cmd.bar_type.spec();
504
505        // Use centralized bar spec mapping
506        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
507
508        // Track active subscription for reconnection recovery
509        let bar_type = cmd.bar_type;
510        self.active_bar_subs
511            .insert((instrument_id, resolution.to_string()), bar_type);
512
513        // Register topic → BarType mapping for validation and lookup
514        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
515        let topic = format!("{ticker}/{resolution}");
516        self.bar_type_mappings.insert(topic.clone(), bar_type);
517
518        self.spawn_ws(
519            async move {
520                // Register bar type in handler BEFORE subscribing to avoid race condition
521                if let Err(e) =
522                    ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
523                        topic,
524                        bar_type,
525                    })
526                {
527                    anyhow::bail!("Failed to register bar type: {e}");
528                }
529
530                // Delay to ensure handler processes registration before candle messages arrive
531                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
532
533                ws.subscribe_candles(instrument_id, resolution)
534                    .await
535                    .context("candles subscription")
536            },
537            "dYdX candles subscription",
538        );
539
540        Ok(())
541    }
542
543    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
544        // Remove from active subscription tracking
545        self.active_trade_subs.remove(&cmd.instrument_id);
546
547        let ws = self.ws_client.clone();
548        let instrument_id = cmd.instrument_id;
549
550        self.spawn_ws(
551            async move {
552                ws.unsubscribe_trades(instrument_id)
553                    .await
554                    .context("trade unsubscription")
555            },
556            "dYdX trade unsubscription",
557        );
558
559        Ok(())
560    }
561
562    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
563        // Remove from active subscription tracking
564        self.active_orderbook_subs.remove(&cmd.instrument_id);
565
566        let ws = self.ws_client.clone();
567        let instrument_id = cmd.instrument_id;
568
569        self.spawn_ws(
570            async move {
571                ws.unsubscribe_orderbook(instrument_id)
572                    .await
573                    .context("orderbook unsubscription")
574            },
575            "dYdX orderbook unsubscription",
576        );
577
578        Ok(())
579    }
580
581    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
582        // dYdX doesn't have a dedicated quotes channel; quotes are derived from book deltas.
583        log::debug!(
584            "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
585            cmd.instrument_id
586        );
587
588        let book_cmd = UnsubscribeBookDeltas {
589            instrument_id: cmd.instrument_id,
590            client_id: cmd.client_id,
591            venue: cmd.venue,
592            command_id: cmd.command_id,
593            ts_init: cmd.ts_init,
594            correlation_id: None,
595            params: cmd.params.clone(),
596        };
597
598        self.unsubscribe_book_deltas(&book_cmd)
599    }
600
601    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
602        let ws = self.ws_client.clone();
603        let instrument_id = cmd.bar_type.instrument_id();
604        let spec = cmd.bar_type.spec();
605
606        // Map BarType spec to dYdX candle resolution string
607        let resolution = match spec.step.get() {
608            1 => match spec.aggregation {
609                BarAggregation::Minute => "1MIN",
610                BarAggregation::Hour => "1HOUR",
611                BarAggregation::Day => "1DAY",
612                _ => {
613                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
614                }
615            },
616            5 => {
617                if spec.aggregation == BarAggregation::Minute {
618                    "5MINS"
619                } else {
620                    anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
621                }
622            }
623            15 => {
624                if spec.aggregation == BarAggregation::Minute {
625                    "15MINS"
626                } else {
627                    anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
628                }
629            }
630            30 => {
631                if spec.aggregation == BarAggregation::Minute {
632                    "30MINS"
633                } else {
634                    anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
635                }
636            }
637            4 => {
638                if spec.aggregation == BarAggregation::Hour {
639                    "4HOURS"
640                } else {
641                    anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
642                }
643            }
644            step => {
645                anyhow::bail!("Unsupported bar step: {step}");
646            }
647        };
648
649        // Remove from active subscription tracking
650        self.active_bar_subs
651            .remove(&(instrument_id, resolution.to_string()));
652
653        // Unregister bar type from handler and local mappings
654        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
655        let topic = format!("{ticker}/{resolution}");
656        self.bar_type_mappings.remove(&topic);
657
658        if let Err(e) =
659            ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
660        {
661            log::warn!("Failed to unregister bar type: {e}");
662        }
663
664        self.spawn_ws(
665            async move {
666                ws.unsubscribe_candles(instrument_id, resolution)
667                    .await
668                    .context("candles unsubscription")
669            },
670            "dYdX candles unsubscription",
671        );
672
673        Ok(())
674    }
675
676    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
677        let instruments_cache = self.instruments.clone();
678        let sender = self.data_sender.clone();
679        let http = self.http_client.clone();
680        let instrument_id = request.instrument_id;
681        let request_id = request.request_id;
682        let client_id = request.client_id.unwrap_or(self.client_id);
683        let start = request.start;
684        let end = request.end;
685        let params = request.params.clone();
686        let clock = self.clock;
687        let start_nanos = datetime_to_unix_nanos(start);
688        let end_nanos = datetime_to_unix_nanos(end);
689
690        get_runtime().spawn(async move {
691            // First try to get from cache
692            let symbol = Ustr::from(instrument_id.symbol.as_str());
693            let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
694                log::debug!("Found instrument {instrument_id} in cache");
695                Some(cached.clone())
696            } else {
697                // Not in cache, fetch from API
698                log::debug!("Instrument {instrument_id} not in cache, fetching from API");
699                match http.request_instruments(None, None, None).await {
700                    Ok(instruments) => {
701                        // Cache all fetched instruments
702                        for inst in &instruments {
703                            upsert_instrument(&instruments_cache, inst.clone());
704                        }
705                        // Find the requested instrument
706                        instruments.into_iter().find(|i| i.id() == instrument_id)
707                    }
708                    Err(e) => {
709                        log::error!("Failed to fetch instruments from dYdX: {e:?}");
710                        None
711                    }
712                }
713            };
714
715            if let Some(inst) = instrument {
716                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
717                    request_id,
718                    client_id,
719                    instrument_id,
720                    inst,
721                    start_nanos,
722                    end_nanos,
723                    clock.get_time_ns(),
724                    params,
725                )));
726
727                if let Err(e) = sender.send(DataEvent::Response(response)) {
728                    log::error!("Failed to send instrument response: {e}");
729                }
730            } else {
731                log::error!("Instrument {instrument_id} not found");
732            }
733        });
734
735        Ok(())
736    }
737
738    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
739        let http = self.http_client.clone();
740        let sender = self.data_sender.clone();
741        let instruments_cache = self.instruments.clone();
742        let request_id = request.request_id;
743        let client_id = request.client_id.unwrap_or(self.client_id);
744        let venue = self.venue();
745        let start = request.start;
746        let end = request.end;
747        let params = request.params.clone();
748        let clock = self.clock;
749        let start_nanos = datetime_to_unix_nanos(start);
750        let end_nanos = datetime_to_unix_nanos(end);
751
752        get_runtime().spawn(async move {
753            match http.request_instruments(None, None, None).await {
754                Ok(instruments) => {
755                    log::info!("Fetched {} instruments from dYdX", instruments.len());
756
757                    // Cache all instruments
758                    for instrument in &instruments {
759                        upsert_instrument(&instruments_cache, instrument.clone());
760                    }
761
762                    let response = DataResponse::Instruments(InstrumentsResponse::new(
763                        request_id,
764                        client_id,
765                        venue,
766                        instruments,
767                        start_nanos,
768                        end_nanos,
769                        clock.get_time_ns(),
770                        params,
771                    ));
772
773                    if let Err(e) = sender.send(DataEvent::Response(response)) {
774                        log::error!("Failed to send instruments response: {e}");
775                    }
776                }
777                Err(e) => {
778                    log::error!("Failed to fetch instruments from dYdX: {e:?}");
779
780                    // Send empty response on error
781                    let response = DataResponse::Instruments(InstrumentsResponse::new(
782                        request_id,
783                        client_id,
784                        venue,
785                        Vec::new(),
786                        start_nanos,
787                        end_nanos,
788                        clock.get_time_ns(),
789                        params,
790                    ));
791
792                    if let Err(e) = sender.send(DataEvent::Response(response)) {
793                        log::error!("Failed to send empty instruments response: {e}");
794                    }
795                }
796            }
797        });
798
799        Ok(())
800    }
801
802    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
803        let http = self.http_client.clone();
804        let instruments = self.instruments.clone();
805        let sender = self.data_sender.clone();
806        let instrument_id = request.instrument_id;
807        let start = request.start;
808        let end = request.end;
809        let limit = request.limit.map(|n| n.get() as u32);
810        let request_id = request.request_id;
811        let client_id = request.client_id.unwrap_or(self.client_id);
812        let params = request.params.clone();
813        let clock = self.clock;
814        let start_nanos = datetime_to_unix_nanos(start);
815        let end_nanos = datetime_to_unix_nanos(end);
816
817        get_runtime().spawn(async move {
818            // dYdX Indexer trades endpoint supports `limit` but not an explicit
819            // date range in this client; we approximate by using the provided
820            // limit and instrument metadata for precision.
821            let ticker = instrument_id
822                .symbol
823                .as_str()
824                .trim_end_matches("-PERP")
825                .to_string();
826
827            // Look up instrument to derive price and size precision.
828            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
829                Some(inst) => inst.clone(),
830                None => {
831                    log::error!(
832                        "request_trades: instrument {instrument_id} not found in cache; cannot convert trades"
833                    );
834                    let ts_now = clock.get_time_ns();
835                    let response = DataResponse::Trades(TradesResponse::new(
836                        request_id,
837                        client_id,
838                        instrument_id,
839                        Vec::new(),
840                        start_nanos,
841                        end_nanos,
842                        ts_now,
843                        params,
844                    ));
845                    if let Err(e) = sender.send(DataEvent::Response(response)) {
846                        log::error!("Failed to send empty trades response: {e}");
847                    }
848                    return;
849                }
850            };
851
852            let price_precision = instrument.price_precision();
853            let size_precision = instrument.size_precision();
854
855            match http
856                .inner
857                .get_trades(&ticker, limit)
858                .await
859                .context("failed to request trades from dYdX")
860            {
861                Ok(trades_response) => {
862                    let mut ticks = Vec::new();
863
864                    for trade in trades_response.trades {
865                        let aggressor_side = match trade.side {
866                            OrderSide::Buy => AggressorSide::Buyer,
867                            OrderSide::Sell => AggressorSide::Seller,
868                            _ => continue, // Skip unsupported side
869                        };
870
871                        let price = match Price::from_decimal_dp(trade.price, price_precision) {
872                            Ok(p) => p,
873                            Err(e) => {
874                                log::warn!(
875                                    "request_trades: failed to convert price for trade {}: {e}",
876                                    trade.id
877                                );
878                                continue;
879                            }
880                        };
881
882                        let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
883                            Ok(q) => q,
884                            Err(e) => {
885                                log::warn!(
886                                    "request_trades: failed to convert size for trade {}: {e}",
887                                    trade.id
888                                );
889                                continue;
890                            }
891                        };
892
893                        let ts_event = match trade.created_at.timestamp_nanos_opt() {
894                            Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
895                            _ => {
896                                log::warn!(
897                                    "request_trades: timestamp out of range for trade {}",
898                                    trade.id
899                                );
900                                continue;
901                            }
902                        };
903
904                        // Apply optional time-range filter.
905                        if let Some(start_ts) = start_nanos
906                            && ts_event < start_ts
907                        {
908                            continue;
909                        }
910                        if let Some(end_ts) = end_nanos
911                            && ts_event > end_ts
912                        {
913                            continue;
914                        }
915
916                        let tick = TradeTick::new(
917                            instrument_id,
918                            price,
919                            size,
920                            aggressor_side,
921                            TradeId::new(&trade.id),
922                            ts_event,
923                            clock.get_time_ns(),
924                        );
925                        ticks.push(tick);
926                    }
927
928                    let response = DataResponse::Trades(TradesResponse::new(
929                        request_id,
930                        client_id,
931                        instrument_id,
932                        ticks,
933                        start_nanos,
934                        end_nanos,
935                        clock.get_time_ns(),
936                        params,
937                    ));
938
939                    if let Err(e) = sender.send(DataEvent::Response(response)) {
940                        log::error!("Failed to send trades response: {e}");
941                    }
942                }
943                Err(e) => {
944                    log::error!("Trade request failed for {instrument_id}: {e:?}");
945
946                    let response = DataResponse::Trades(TradesResponse::new(
947                        request_id,
948                        client_id,
949                        instrument_id,
950                        Vec::new(),
951                        start_nanos,
952                        end_nanos,
953                        clock.get_time_ns(),
954                        params,
955                    ));
956
957                    if let Err(e) = sender.send(DataEvent::Response(response)) {
958                        log::error!("Failed to send empty trades response: {e}");
959                    }
960                }
961            }
962        });
963
964        Ok(())
965    }
966
967    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
968        const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
969
970        let bar_type = request.bar_type;
971        let spec = bar_type.spec();
972
973        // Validate bar type requirements
974        if bar_type.aggregation_source() != AggregationSource::External {
975            anyhow::bail!(
976                "dYdX only supports EXTERNAL aggregation, got {:?}",
977                bar_type.aggregation_source()
978            );
979        }
980
981        if spec.price_type != PriceType::Last {
982            anyhow::bail!(
983                "dYdX only supports LAST price type, got {:?}",
984                spec.price_type
985            );
986        }
987
988        // Map BarType spec to dYdX resolution
989        let resolution = match spec.step.get() {
990            1 => match spec.aggregation {
991                BarAggregation::Minute => "1MIN",
992                BarAggregation::Hour => "1HOUR",
993                BarAggregation::Day => "1DAY",
994                _ => {
995                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
996                }
997            },
998            5 if spec.aggregation == BarAggregation::Minute => "5MINS",
999            15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1000            30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1001            4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1002            step => {
1003                anyhow::bail!("Unsupported bar step: {step}");
1004            }
1005        };
1006
1007        let http = self.http_client.clone();
1008        let instruments = self.instruments.clone();
1009        let sender = self.data_sender.clone();
1010        let instrument_id = bar_type.instrument_id();
1011        // dYdX ticker does not include the "-PERP" suffix.
1012        let symbol = instrument_id
1013            .symbol
1014            .as_str()
1015            .trim_end_matches("-PERP")
1016            .to_string();
1017        let request_id = request.request_id;
1018        let client_id = request.client_id.unwrap_or(self.client_id);
1019        let params = request.params.clone();
1020        let clock = self.clock;
1021
1022        let start = request.start;
1023        let end = request.end;
1024        let overall_limit = request.limit.map(|n| n.get() as u32);
1025
1026        // Convert optional datetimes to UnixNanos for response metadata
1027        let start_nanos = datetime_to_unix_nanos(start);
1028        let end_nanos = datetime_to_unix_nanos(end);
1029
1030        // Parse resolution string to DydxCandleResolution enum
1031        let resolution_enum = match resolution {
1032            "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1033            "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1034            "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1035            "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1036            "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1037            "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1038            "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1039            _ => {
1040                anyhow::bail!("Unsupported resolution: {resolution}");
1041            }
1042        };
1043
1044        get_runtime().spawn(async move {
1045            // Determine bar duration in seconds.
1046            let bar_secs: i64 = match spec.aggregation {
1047                BarAggregation::Minute => spec.step.get() as i64 * 60,
1048                BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1049                BarAggregation::Day => spec.step.get() as i64 * 86_400,
1050                _ => {
1051                    log::error!(
1052                        "Unsupported aggregation for request_bars: {:?}",
1053                        spec.aggregation
1054                    );
1055                    return;
1056                }
1057            };
1058
1059            // Look up instrument to derive price and size precision.
1060            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1061                Some(inst) => inst.clone(),
1062                None => {
1063                    log::error!(
1064                        "request_bars: instrument {instrument_id} not found in cache; cannot convert candles"
1065                    );
1066                    let ts_now = clock.get_time_ns();
1067                    let response = DataResponse::Bars(BarsResponse::new(
1068                        request_id,
1069                        client_id,
1070                        bar_type,
1071                        Vec::new(),
1072                        start_nanos,
1073                        end_nanos,
1074                        ts_now,
1075                        params,
1076                    ));
1077                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1078                        log::error!("Failed to send empty bars response: {e}");
1079                    }
1080                    return;
1081                }
1082            };
1083
1084            let price_precision = instrument.price_precision();
1085            let size_precision = instrument.size_precision();
1086
1087            let mut all_bars: Vec<Bar> = Vec::new();
1088
1089            // If no explicit date range, fall back to a single request using only `limit`.
1090            let (range_start, range_end) = match (start, end) {
1091                (Some(s), Some(e)) if e > s => (s, e),
1092                _ => {
1093                    let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1094                    match http
1095                        .inner
1096                        .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1097                        .await
1098                    {
1099                        Ok(candles_response) => {
1100                            log::debug!(
1101                                "request_bars fetched {} candles without explicit date range",
1102                                candles_response.candles.len()
1103                            );
1104
1105                            for candle in &candles_response.candles {
1106                                match Self::candle_to_bar(
1107                                    candle,
1108                                    bar_type,
1109                                    price_precision,
1110                                    size_precision,
1111                                    bar_secs,
1112                                    clock,
1113                                ) {
1114                                    Ok(bar) => all_bars.push(bar),
1115                                    Err(e) => {
1116                                        log::warn!(
1117                                            "Failed to convert dYdX candle to bar for {instrument_id}: {e}"
1118                                        );
1119                                    }
1120                                }
1121                            }
1122
1123                            let current_time_ns = clock.get_time_ns();
1124                            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1125
1126                            let response = DataResponse::Bars(BarsResponse::new(
1127                                request_id,
1128                                client_id,
1129                                bar_type,
1130                                all_bars,
1131                                start_nanos,
1132                                end_nanos,
1133                                current_time_ns,
1134                                params,
1135                            ));
1136
1137                            if let Err(e) = sender.send(DataEvent::Response(response)) {
1138                                log::error!("Failed to send bars response: {e}");
1139                            }
1140                        }
1141                        Err(e) => {
1142                            log::error!(
1143                                "Failed to request candles for {symbol} without date range: {e:?}"
1144                            );
1145                        }
1146                    }
1147                    return;
1148                }
1149            };
1150
1151            // Calculate expected bars for the range.
1152            let total_secs = (range_end - range_start).num_seconds().max(0);
1153            let expected_bars = (total_secs / bar_secs).max(1) as u64;
1154
1155            log::debug!(
1156                "request_bars range {range_start:?} -> {range_end:?}, expected_bars ~= {expected_bars}"
1157            );
1158
1159            let mut remaining = overall_limit.unwrap_or(u32::MAX);
1160
1161            // Determine chunk duration using max bars per request.
1162            let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1163            let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1164
1165            let mut chunk_start = range_start;
1166
1167            while chunk_start < range_end && remaining > 0 {
1168                let mut chunk_end = chunk_start + chunk_duration;
1169                if chunk_end > range_end {
1170                    chunk_end = range_end;
1171                }
1172
1173                let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1174
1175                log::debug!(
1176                    "request_bars chunk: {chunk_start} -> {chunk_end}, limit={per_call_limit}"
1177                );
1178
1179                match http
1180                    .inner
1181                    .get_candles(
1182                        &symbol,
1183                        resolution_enum,
1184                        Some(per_call_limit),
1185                        Some(chunk_start),
1186                        Some(chunk_end),
1187                    )
1188                    .await
1189                {
1190                    Ok(candles_response) => {
1191                        let count = candles_response.candles.len() as u32;
1192
1193                        if count == 0 {
1194                            // No more data available; stop early.
1195                            break;
1196                        }
1197
1198                        // Convert candles to bars and accumulate.
1199                        for candle in &candles_response.candles {
1200                            match Self::candle_to_bar(
1201                                candle,
1202                                bar_type,
1203                                price_precision,
1204                                size_precision,
1205                                bar_secs,
1206                                clock,
1207                            ) {
1208                                Ok(bar) => all_bars.push(bar),
1209                                Err(e) => {
1210                                    log::warn!(
1211                                        "Failed to convert dYdX candle to bar for {instrument_id}: {e}"
1212                                    );
1213                                }
1214                            }
1215                        }
1216
1217                        if remaining <= count {
1218                            break;
1219                        } else {
1220                            remaining -= count;
1221                        }
1222                    }
1223                    Err(e) => {
1224                        log::error!(
1225                            "Failed to request candles for {symbol} in chunk {chunk_start:?} -> {chunk_end:?}: {e:?}"
1226                        );
1227                        break;
1228                    }
1229                }
1230
1231                chunk_start += chunk_duration;
1232            }
1233
1234            log::debug!("request_bars completed partitioned fetch for {bar_type}");
1235
1236            // Filter incomplete bars: only return bars where ts_event < current_time_ns
1237            let current_time_ns = clock.get_time_ns();
1238            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1239
1240            log::debug!(
1241                "request_bars filtered to {} completed bars (current_time_ns={})",
1242                all_bars.len(),
1243                current_time_ns
1244            );
1245
1246            let response = DataResponse::Bars(BarsResponse::new(
1247                request_id,
1248                client_id,
1249                bar_type,
1250                all_bars,
1251                start_nanos,
1252                end_nanos,
1253                current_time_ns,
1254                params,
1255            ));
1256
1257            if let Err(e) = sender.send(DataEvent::Response(response)) {
1258                log::error!("Failed to send bars response: {e}");
1259            }
1260        });
1261
1262        Ok(())
1263    }
1264}
1265
1266/// Upserts an instrument into the shared cache.
1267fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1268    let symbol = Ustr::from(instrument.id().symbol.as_str());
1269    cache.insert(symbol, instrument);
1270}
1271
1272impl DydxDataClient {
1273    /// Start a task to periodically refresh instruments.
1274    ///
1275    /// This task runs in the background and updates the instrument cache
1276    /// at the configured interval.
1277    ///
1278    /// # Errors
1279    ///
1280    /// Returns an error if a refresh task is already running.
1281    pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1282        let interval_secs = match self.config.instrument_refresh_interval_secs {
1283            Some(secs) if secs > 0 => secs,
1284            _ => {
1285                log::info!("Instrument refresh disabled (interval not configured)");
1286                return Ok(());
1287            }
1288        };
1289
1290        let interval = Duration::from_secs(interval_secs);
1291        let http_client = self.http_client.clone();
1292        let instruments_cache = self.instruments.clone();
1293        let ws_client = self.ws_client.clone();
1294        let cancellation_token = self.cancellation_token.clone();
1295
1296        log::info!("Starting instrument refresh task (interval: {interval_secs}s)");
1297
1298        let task = get_runtime().spawn(async move {
1299            let mut interval_timer = tokio::time::interval(interval);
1300            interval_timer.tick().await; // Skip first immediate tick
1301
1302            loop {
1303                tokio::select! {
1304                    () = cancellation_token.cancelled() => {
1305                        log::info!("Instrument refresh task cancelled");
1306                        break;
1307                    }
1308                    _ = interval_timer.tick() => {
1309                        log::debug!("Refreshing instruments");
1310
1311                        // Populates all HTTP cache layers (instruments, clob_pair_id, market_params)
1312                        match http_client.fetch_and_cache_instruments().await {
1313                            Ok(()) => {
1314                                let instruments: Vec<_> = http_client
1315                                    .instruments()
1316                                    .iter()
1317                                    .map(|entry| entry.value().clone())
1318                                    .collect();
1319
1320                                log::debug!("Refreshed {} instruments", instruments.len());
1321
1322                                for instrument in &instruments {
1323                                    upsert_instrument(&instruments_cache, instrument.clone());
1324                                }
1325
1326                                // Propagate to WS handler for message parsing
1327                                ws_client.cache_instruments(instruments);
1328                            }
1329                            Err(e) => {
1330                                log::error!("Failed to refresh instruments: {e}");
1331                            }
1332                        }
1333                    }
1334                }
1335            }
1336        });
1337
1338        self.tasks.push(task);
1339        Ok(())
1340    }
1341
1342    /// Start a background task to periodically refresh orderbook snapshots.
1343    ///
1344    /// This prevents stale orderbooks from missed WebSocket messages due to:
1345    /// - Network issues or message drops
1346    /// - dYdX validator delays
1347    /// - WebSocket reconnection gaps
1348    ///
1349    /// The task fetches fresh snapshots via HTTP at the configured interval
1350    /// and applies them to the local orderbooks.
1351    fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1352        let interval_secs = match self.config.orderbook_refresh_interval_secs {
1353            Some(secs) if secs > 0 => secs,
1354            _ => {
1355                log::info!("Orderbook snapshot refresh disabled (interval not configured)");
1356                return Ok(());
1357            }
1358        };
1359
1360        let interval = Duration::from_secs(interval_secs);
1361        let http_client = self.http_client.clone();
1362        let instruments = self.instruments.clone();
1363        let order_books = self.order_books.clone();
1364        let active_subs = self.active_orderbook_subs.clone();
1365        let cancellation_token = self.cancellation_token.clone();
1366        let data_sender = self.data_sender.clone();
1367
1368        log::info!("Starting orderbook snapshot refresh task (interval: {interval_secs}s)");
1369
1370        let task = get_runtime().spawn(async move {
1371            let mut interval_timer = tokio::time::interval(interval);
1372            interval_timer.tick().await; // Skip first immediate tick
1373
1374            loop {
1375                tokio::select! {
1376                    () = cancellation_token.cancelled() => {
1377                        log::info!("Orderbook refresh task cancelled");
1378                        break;
1379                    }
1380                    _ = interval_timer.tick() => {
1381                        let active_instruments: Vec<InstrumentId> = active_subs
1382                            .iter()
1383                            .map(|entry| *entry.key())
1384                            .collect();
1385
1386                        if active_instruments.is_empty() {
1387                            log::debug!("No active orderbook subscriptions to refresh");
1388                            continue;
1389                        }
1390
1391                        log::debug!(
1392                            "Refreshing {} orderbook snapshots",
1393                            active_instruments.len()
1394                        );
1395
1396                        for instrument_id in active_instruments {
1397                            // Get instrument for parsing
1398                            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1399                                Some(inst) => inst.clone(),
1400                                None => {
1401                                    log::warn!(
1402                                        "Cannot refresh orderbook: no instrument for {instrument_id}"
1403                                    );
1404                                    continue;
1405                                }
1406                            };
1407
1408                            // Fetch snapshot via HTTP (strip -PERP suffix for dYdX API)
1409                            let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1410                            let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1411
1412                            let snapshot = match snapshot_result {
1413                                Ok(s) => s,
1414                                Err(e) => {
1415                                    log::error!(
1416                                        "Failed to fetch orderbook snapshot for {instrument_id}: {e}"
1417                                    );
1418                                    continue;
1419                                }
1420                            };
1421
1422                            // Convert HTTP snapshot to OrderBookDeltas
1423                            let deltas_result = Self::parse_orderbook_snapshot(
1424                                instrument_id,
1425                                &snapshot,
1426                                &instrument,
1427                            );
1428
1429                            let deltas = match deltas_result {
1430                                Ok(d) => d,
1431                                Err(e) => {
1432                                    log::error!(
1433                                        "Failed to parse orderbook snapshot for {instrument_id}: {e}"
1434                                    );
1435                                    continue;
1436                                }
1437                            };
1438
1439                            // Apply snapshot to local orderbook
1440                            if let Some(mut book) = order_books.get_mut(&instrument_id) {
1441                                if let Err(e) = book.apply_deltas(&deltas) {
1442                                    log::error!(
1443                                        "Failed to apply orderbook snapshot for {instrument_id}: {e}"
1444                                    );
1445                                    continue;
1446                                }
1447
1448                                log::debug!(
1449                                    "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1450                                    instrument_id,
1451                                    book.best_bid_price(),
1452                                    book.best_ask_price()
1453                                );
1454                            }
1455
1456                            // Emit the snapshot deltas
1457                            let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1458                            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1459                                log::error!("Failed to emit orderbook snapshot: {e}");
1460                            }
1461                        }
1462                    }
1463                }
1464            }
1465        });
1466
1467        self.tasks.push(task);
1468        Ok(())
1469    }
1470
1471    /// Parse HTTP orderbook snapshot into OrderBookDeltas.
1472    ///
1473    /// Converts the REST API orderbook format into Nautilus deltas with CLEAR + ADD actions.
1474    fn parse_orderbook_snapshot(
1475        instrument_id: InstrumentId,
1476        snapshot: &crate::http::models::OrderbookResponse,
1477        instrument: &InstrumentAny,
1478    ) -> anyhow::Result<OrderBookDeltas> {
1479        let ts_init = get_atomic_clock_realtime().get_time_ns();
1480        let mut deltas = Vec::new();
1481
1482        // Add clear delta first
1483        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1484
1485        let price_precision = instrument.price_precision();
1486        let size_precision = instrument.size_precision();
1487
1488        let bids_len = snapshot.bids.len();
1489        let asks_len = snapshot.asks.len();
1490
1491        // Add bid levels
1492        for (idx, bid) in snapshot.bids.iter().enumerate() {
1493            let is_last = idx == bids_len - 1 && asks_len == 0;
1494            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1495
1496            let price = Price::from_decimal_dp(bid.price, price_precision)
1497                .context("failed to parse bid price")?;
1498            let size = Quantity::from_decimal_dp(bid.size, size_precision)
1499                .context("failed to parse bid size")?;
1500
1501            let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1502            deltas.push(OrderBookDelta::new(
1503                instrument_id,
1504                BookAction::Add,
1505                order,
1506                flags,
1507                0,
1508                ts_init,
1509                ts_init,
1510            ));
1511        }
1512
1513        // Add ask levels
1514        for (idx, ask) in snapshot.asks.iter().enumerate() {
1515            let is_last = idx == asks_len - 1;
1516            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1517
1518            let price = Price::from_decimal_dp(ask.price, price_precision)
1519                .context("failed to parse ask price")?;
1520            let size = Quantity::from_decimal_dp(ask.size, size_precision)
1521                .context("failed to parse ask size")?;
1522
1523            let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1524            deltas.push(OrderBookDelta::new(
1525                instrument_id,
1526                BookAction::Add,
1527                order,
1528                flags,
1529                0,
1530                ts_init,
1531                ts_init,
1532            ));
1533        }
1534
1535        Ok(OrderBookDeltas::new(instrument_id, deltas))
1536    }
1537
1538    /// Get a cached instrument by symbol.
1539    #[must_use]
1540    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1541        self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1542    }
1543
1544    /// Get all cached instruments.
1545    #[must_use]
1546    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1547        self.instruments.iter().map(|i| i.clone()).collect()
1548    }
1549
1550    fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1551        self.order_books
1552            .entry(instrument_id)
1553            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1554    }
1555
1556    /// Get BarType for a given WebSocket candle topic.
1557    #[must_use]
1558    pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1559        self.bar_type_mappings
1560            .get(topic)
1561            .map(|entry| *entry.value())
1562    }
1563
1564    /// Get all registered bar topics.
1565    #[must_use]
1566    pub fn get_bar_topics(&self) -> Vec<String> {
1567        self.bar_type_mappings
1568            .iter()
1569            .map(|entry| entry.key().clone())
1570            .collect()
1571    }
1572
1573    /// Convert a dYdX HTTP candle into a Nautilus [`Bar`].
1574    ///
1575    /// This mirrors the conversion logic used in other adapters (for example
1576    /// Hyperliquid), using the instrument price/size precision and mapping the
1577    /// candle start time to `ts_init` with `ts_event` at the end of the bar
1578    /// interval.
1579    fn candle_to_bar(
1580        candle: &crate::http::models::Candle,
1581        bar_type: BarType,
1582        price_precision: u8,
1583        size_precision: u8,
1584        bar_secs: i64,
1585        clock: &AtomicTime,
1586    ) -> anyhow::Result<Bar> {
1587        // Convert candle start time to UnixNanos (ts_init).
1588        let ts_init =
1589            datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1590
1591        // Treat ts_event as the end of the bar interval.
1592        let ts_event_ns = ts_init
1593            .as_u64()
1594            .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1595        let ts_event = UnixNanos::from(ts_event_ns);
1596
1597        let open = Price::from_decimal_dp(candle.open, price_precision)
1598            .context("failed to parse candle open price")?;
1599        let high = Price::from_decimal_dp(candle.high, price_precision)
1600            .context("failed to parse candle high price")?;
1601        let low = Price::from_decimal_dp(candle.low, price_precision)
1602            .context("failed to parse candle low price")?;
1603        let close = Price::from_decimal_dp(candle.close, price_precision)
1604            .context("failed to parse candle close price")?;
1605
1606        // Use base token volume as bar volume.
1607        let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1608            .context("failed to parse candle base_token_volume")?;
1609
1610        Ok(Bar::new(
1611            bar_type, open, high, low, close, volume, ts_event, ts_init,
1612        ))
1613    }
1614
1615    fn handle_ws_message(
1616        message: crate::websocket::enums::NautilusWsMessage,
1617        ctx: &WsMessageContext,
1618    ) {
1619        match message {
1620            crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1621                Self::handle_data_message(payloads, &ctx.data_sender, &ctx.incomplete_bars);
1622            }
1623            crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1624                Self::handle_deltas_message(
1625                    *deltas,
1626                    &ctx.data_sender,
1627                    &ctx.order_books,
1628                    &ctx.last_quotes,
1629                    &ctx.instruments,
1630                );
1631            }
1632            crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1633                Self::handle_oracle_prices(oracle_prices, &ctx.instruments, &ctx.data_sender);
1634            }
1635            crate::websocket::enums::NautilusWsMessage::Error(err) => {
1636                log::error!("dYdX WS error: {err}");
1637            }
1638            crate::websocket::enums::NautilusWsMessage::Reconnected => {
1639                log::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1640
1641                let total_subs = ctx.active_orderbook_subs.len()
1642                    + ctx.active_trade_subs.len()
1643                    + ctx.active_bar_subs.len();
1644
1645                if total_subs == 0 {
1646                    log::debug!("No active subscriptions to restore");
1647                    return;
1648                }
1649
1650                log::info!(
1651                    "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1652                    total_subs,
1653                    ctx.active_orderbook_subs.len(),
1654                    ctx.active_trade_subs.len(),
1655                    ctx.active_bar_subs.len()
1656                );
1657
1658                // Re-subscribe to orderbook channels
1659                for entry in ctx.active_orderbook_subs.iter() {
1660                    let instrument_id = *entry.key();
1661                    let ws_clone = ctx.ws_client.clone();
1662                    get_runtime().spawn(async move {
1663                        if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1664                            log::error!(
1665                                "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1666                            );
1667                        } else {
1668                            log::debug!("Re-subscribed to orderbook for {instrument_id}");
1669                        }
1670                    });
1671                }
1672
1673                // Re-subscribe to trade channels
1674                for entry in ctx.active_trade_subs.iter() {
1675                    let instrument_id = *entry.key();
1676                    let ws_clone = ctx.ws_client.clone();
1677                    get_runtime().spawn(async move {
1678                        if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1679                            log::error!(
1680                                "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1681                            );
1682                        } else {
1683                            log::debug!("Re-subscribed to trades for {instrument_id}");
1684                        }
1685                    });
1686                }
1687
1688                // Re-subscribe to candle/bar channels
1689                for entry in ctx.active_bar_subs.iter() {
1690                    let (instrument_id, resolution) = entry.key();
1691                    let instrument_id = *instrument_id;
1692                    let resolution = resolution.clone();
1693                    let bar_type = *entry.value();
1694                    let ws_clone = ctx.ws_client.clone();
1695
1696                    // Re-register bar type with handler
1697                    let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1698                    let topic = format!("{ticker}/{resolution}");
1699                    if let Err(e) = ctx.ws_client.send_command(
1700                        crate::websocket::handler::HandlerCommand::RegisterBarType {
1701                            topic,
1702                            bar_type,
1703                        },
1704                    ) {
1705                        log::warn!(
1706                            "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1707                        );
1708                    }
1709
1710                    get_runtime().spawn(async move {
1711                        if let Err(e) =
1712                            ws_clone.subscribe_candles(instrument_id, &resolution).await
1713                        {
1714                            log::error!(
1715                                "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1716                            );
1717                        } else {
1718                            log::debug!(
1719                                "Re-subscribed to candles for {instrument_id} ({resolution})"
1720                            );
1721                        }
1722                    });
1723                }
1724
1725                log::info!("Completed re-subscription requests after reconnection");
1726            }
1727            crate::websocket::enums::NautilusWsMessage::BlockHeight(_) => {
1728                log::debug!(
1729                    "Ignoring block height message on dYdX data client (handled by execution adapter)"
1730                );
1731            }
1732            crate::websocket::enums::NautilusWsMessage::Order(_)
1733            | crate::websocket::enums::NautilusWsMessage::Fill(_)
1734            | crate::websocket::enums::NautilusWsMessage::Position(_)
1735            | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1736            | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1737            | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1738                log::debug!(
1739                    "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1740                );
1741            }
1742        }
1743    }
1744
1745    fn handle_data_message(
1746        payloads: Vec<NautilusData>,
1747        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1748        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1749    ) {
1750        for data in payloads {
1751            // Filter bars through incomplete bars cache
1752            if let NautilusData::Bar(bar) = data {
1753                Self::handle_bar_message(bar, data_sender, incomplete_bars);
1754            } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1755                log::error!("Failed to emit data event: {e}");
1756            }
1757        }
1758    }
1759
1760    /// Handles bar messages by tracking incomplete bars and only emitting completed ones.
1761    ///
1762    /// WebSocket candle updates arrive continuously. This method:
1763    /// - Caches bars where ts_event > current_time (incomplete)
1764    /// - Emits bars where ts_event <= current_time (complete)
1765    /// - Updates cached incomplete bars with latest data
1766    fn handle_bar_message(
1767        bar: Bar,
1768        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1769        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1770    ) {
1771        let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1772        let bar_type = bar.bar_type;
1773
1774        if bar.ts_event <= current_time_ns {
1775            // Bar is complete - emit it and remove from incomplete cache
1776            incomplete_bars.remove(&bar_type);
1777            if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1778                log::error!("Failed to emit completed bar: {e}");
1779            }
1780        } else {
1781            // Bar is incomplete - cache it (updates existing entry)
1782            log::trace!(
1783                "Caching incomplete bar for {} (ts_event={}, current={})",
1784                bar_type,
1785                bar.ts_event,
1786                current_time_ns
1787            );
1788            incomplete_bars.insert(bar_type, bar);
1789        }
1790    }
1791
1792    /// Resolves a crossed order book by generating synthetic deltas to uncross it.
1793    ///
1794    /// dYdX order books can become crossed due to:
1795    /// - Validator delays in order acknowledgment across the network
1796    /// - Missed or delayed WebSocket messages from the venue
1797    ///
1798    /// This function detects when bid_price >= ask_price and iteratively removes
1799    /// the smaller side while adjusting the larger side until the book is uncrossed.
1800    ///
1801    /// # Algorithm
1802    ///
1803    /// For each crossed level:
1804    /// - If bid_size > ask_size: DELETE ask, UPDATE bid (reduce by ask_size)
1805    /// - If bid_size < ask_size: DELETE bid, UPDATE ask (reduce by bid_size)
1806    /// - If bid_size == ask_size: DELETE both bid and ask
1807    ///
1808    /// The algorithm continues until no more crosses exist or the book is empty.
1809    fn resolve_crossed_order_book(
1810        book: &mut OrderBook,
1811        venue_deltas: OrderBookDeltas,
1812        instrument: &InstrumentAny,
1813    ) -> anyhow::Result<OrderBookDeltas> {
1814        let instrument_id = venue_deltas.instrument_id;
1815        let ts_init = venue_deltas.ts_init;
1816        let mut all_deltas = venue_deltas.deltas.clone();
1817
1818        // Apply the original venue deltas first
1819        book.apply_deltas(&venue_deltas)?;
1820
1821        // Check if orderbook is crossed
1822        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1823            (book.best_bid_price(), book.best_ask_price())
1824        {
1825            bid_price >= ask_price
1826        } else {
1827            false
1828        };
1829
1830        // Iteratively uncross the orderbook
1831        while is_crossed {
1832            log::debug!(
1833                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1834                instrument_id,
1835                book.best_bid_price(),
1836                book.best_ask_price()
1837            );
1838
1839            let bid_price = match book.best_bid_price() {
1840                Some(p) => p,
1841                None => break,
1842            };
1843            let ask_price = match book.best_ask_price() {
1844                Some(p) => p,
1845                None => break,
1846            };
1847            let bid_size = match book.best_bid_size() {
1848                Some(s) => s,
1849                None => break,
1850            };
1851            let ask_size = match book.best_ask_size() {
1852                Some(s) => s,
1853                None => break,
1854            };
1855
1856            let mut temp_deltas = Vec::new();
1857
1858            if bid_size > ask_size {
1859                // Remove ask level, reduce bid level
1860                let new_bid_size = Quantity::new(
1861                    bid_size.as_f64() - ask_size.as_f64(),
1862                    instrument.size_precision(),
1863                );
1864                temp_deltas.push(OrderBookDelta::new(
1865                    instrument_id,
1866                    BookAction::Update,
1867                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1868                    0,
1869                    0,
1870                    ts_init,
1871                    ts_init,
1872                ));
1873                temp_deltas.push(OrderBookDelta::new(
1874                    instrument_id,
1875                    BookAction::Delete,
1876                    BookOrder::new(
1877                        OrderSide::Sell,
1878                        ask_price,
1879                        Quantity::new(0.0, instrument.size_precision()),
1880                        0,
1881                    ),
1882                    0,
1883                    0,
1884                    ts_init,
1885                    ts_init,
1886                ));
1887            } else if bid_size < ask_size {
1888                // Remove bid level, reduce ask level
1889                let new_ask_size = Quantity::new(
1890                    ask_size.as_f64() - bid_size.as_f64(),
1891                    instrument.size_precision(),
1892                );
1893                temp_deltas.push(OrderBookDelta::new(
1894                    instrument_id,
1895                    BookAction::Update,
1896                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1897                    0,
1898                    0,
1899                    ts_init,
1900                    ts_init,
1901                ));
1902                temp_deltas.push(OrderBookDelta::new(
1903                    instrument_id,
1904                    BookAction::Delete,
1905                    BookOrder::new(
1906                        OrderSide::Buy,
1907                        bid_price,
1908                        Quantity::new(0.0, instrument.size_precision()),
1909                        0,
1910                    ),
1911                    0,
1912                    0,
1913                    ts_init,
1914                    ts_init,
1915                ));
1916            } else {
1917                // Equal sizes: remove both levels
1918                temp_deltas.push(OrderBookDelta::new(
1919                    instrument_id,
1920                    BookAction::Delete,
1921                    BookOrder::new(
1922                        OrderSide::Buy,
1923                        bid_price,
1924                        Quantity::new(0.0, instrument.size_precision()),
1925                        0,
1926                    ),
1927                    0,
1928                    0,
1929                    ts_init,
1930                    ts_init,
1931                ));
1932                temp_deltas.push(OrderBookDelta::new(
1933                    instrument_id,
1934                    BookAction::Delete,
1935                    BookOrder::new(
1936                        OrderSide::Sell,
1937                        ask_price,
1938                        Quantity::new(0.0, instrument.size_precision()),
1939                        0,
1940                    ),
1941                    0,
1942                    0,
1943                    ts_init,
1944                    ts_init,
1945                ));
1946            }
1947
1948            // Apply temporary deltas to the book
1949            let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1950            book.apply_deltas(&temp_deltas_obj)?;
1951            all_deltas.extend(temp_deltas);
1952
1953            // Check if still crossed
1954            is_crossed = if let (Some(bid_price), Some(ask_price)) =
1955                (book.best_bid_price(), book.best_ask_price())
1956            {
1957                bid_price >= ask_price
1958            } else {
1959                false
1960            };
1961        }
1962
1963        // Set F_LAST flag on the final delta
1964        if let Some(last_delta) = all_deltas.last_mut() {
1965            last_delta.flags = RecordFlag::F_LAST as u8;
1966        }
1967
1968        Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1969    }
1970
1971    fn handle_deltas_message(
1972        deltas: OrderBookDeltas,
1973        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1974        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1975        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1976        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
1977    ) {
1978        let instrument_id = deltas.instrument_id;
1979
1980        // Get instrument for crossed orderbook resolution
1981        let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1982            Some(inst) => inst.clone(),
1983            None => {
1984                log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1985                // Still emit the raw deltas even without instrument
1986                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1987                    OrderBookDeltas_API::new(deltas),
1988                ))) {
1989                    log::error!("Failed to emit order book deltas: {e}");
1990                }
1991                return;
1992            }
1993        };
1994
1995        // Get or create order book
1996        let mut book = order_books
1997            .entry(instrument_id)
1998            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1999
2000        // Resolve crossed orderbook (applies deltas internally)
2001        let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2002        {
2003            Ok(d) => d,
2004            Err(e) => {
2005                log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2006                return;
2007            }
2008        };
2009
2010        // Generate QuoteTick from updated top-of-book
2011        // Edge case: If orderbook is empty after deltas, fall back to last quote
2012        let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2013            (book.best_bid_price(), book.best_ask_price())
2014            && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2015        {
2016            Some(QuoteTick::new(
2017                instrument_id,
2018                bid_price,
2019                ask_price,
2020                bid_size,
2021                ask_size,
2022                resolved_deltas.ts_event,
2023                resolved_deltas.ts_init,
2024            ))
2025        } else {
2026            // Edge case: Empty orderbook levels - use last quote as fallback
2027            if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2028                log::debug!(
2029                    "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2030                );
2031                last_quotes.get(&instrument_id).map(|q| *q)
2032            } else {
2033                None
2034            }
2035        };
2036
2037        if let Some(quote) = quote_opt {
2038            // Only emit when top-of-book changes
2039            let emit_quote =
2040                !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2041
2042            if emit_quote {
2043                last_quotes.insert(instrument_id, quote);
2044                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2045                    log::error!("Failed to emit quote tick: {e}");
2046                }
2047            }
2048        } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2049            // Partial orderbook (only one side) - log but don't emit
2050            log::debug!(
2051                "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2052                book.best_bid_price(),
2053                book.best_ask_price()
2054            );
2055        }
2056
2057        // Emit the resolved order book deltas
2058        let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2059        if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2060            log::error!("Failed to emit order book deltas event: {e}");
2061        }
2062    }
2063
2064    fn handle_oracle_prices(
2065        oracle_prices: std::collections::HashMap<
2066            String,
2067            crate::websocket::messages::DydxOraclePriceMarket,
2068        >,
2069        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2070        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2071    ) {
2072        let ts_init = get_atomic_clock_realtime().get_time_ns();
2073
2074        for (symbol_str, oracle_market) in oracle_prices {
2075            // Oracle prices use market format (e.g., "BTC-USD"), but instruments are keyed
2076            // by perpetual symbol (e.g., "BTC-USD-PERP")
2077            let perp_symbol = format!("{symbol_str}-PERP");
2078            let symbol = Ustr::from(&perp_symbol);
2079
2080            // Get instrument to access instrument_id
2081            let Some(instrument) = instruments.get(&symbol) else {
2082                log::debug!(
2083                    "Received oracle price for unknown instrument (not cached yet): symbol={symbol}"
2084                );
2085                continue;
2086            };
2087
2088            let instrument_id = instrument.id();
2089
2090            // Parse oracle price string to Price
2091            let oracle_price_str = &oracle_market.oracle_price;
2092            let Ok(oracle_price_dec) = oracle_price_str.parse::<Decimal>() else {
2093                log::error!(
2094                    "Failed to parse oracle price: symbol={symbol}, price_str={oracle_price_str}"
2095                );
2096                continue;
2097            };
2098
2099            let price_precision = instrument.price_precision();
2100            let Ok(oracle_price) = Price::from_decimal_dp(oracle_price_dec, price_precision) else {
2101                log::error!(
2102                    "Failed to create oracle Price: symbol={symbol}, price={oracle_price_dec}"
2103                );
2104                continue;
2105            };
2106
2107            let oracle_price_event = DydxOraclePrice::new(
2108                instrument_id,
2109                oracle_price,
2110                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2111                ts_init,
2112            );
2113
2114            log::debug!(
2115                "Received dYdX oracle price: instrument_id={instrument_id}, oracle_price={oracle_price}, {oracle_price_event:?}"
2116            );
2117
2118            let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2119                instrument_id,
2120                oracle_price,
2121                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2122                ts_init,
2123            ));
2124
2125            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2126                log::error!("Failed to emit oracle price: {e}");
2127            }
2128        }
2129    }
2130}
2131
2132#[cfg(test)]
2133mod tests {
2134    use std::{collections::HashMap, net::SocketAddr, time::Duration};
2135
2136    use axum::{
2137        Router,
2138        extract::{Path, Query, State},
2139        response::Json,
2140        routing::get,
2141    };
2142    use chrono::Utc;
2143    use indexmap::IndexMap;
2144    use nautilus_common::{
2145        live::runner::set_data_event_sender,
2146        messages::{DataEvent, data::DataResponse},
2147        testing::wait_until_async,
2148    };
2149    use nautilus_core::UUID4;
2150    use nautilus_model::{
2151        data::{
2152            BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2153            TradeTick, order::BookOrder,
2154        },
2155        enums::{
2156            AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2157            PriceType,
2158        },
2159        identifiers::{ClientId, InstrumentId, Symbol, Venue},
2160        instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2161        orderbook::OrderBook,
2162        types::{Currency, Price, Quantity},
2163    };
2164    use rstest::rstest;
2165    use rust_decimal::Decimal;
2166    use rust_decimal_macros::dec;
2167    use tokio::net::{TcpListener, TcpStream};
2168
2169    use super::*;
2170    use crate::http::models::{Candle, CandlesResponse};
2171
2172    fn setup_test_env() {
2173        // Initialize data event sender for tests
2174        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2175        set_data_event_sender(sender);
2176    }
2177
2178    async fn wait_for_server(addr: SocketAddr) {
2179        wait_until_async(
2180            || async move { TcpStream::connect(addr).await.is_ok() },
2181            Duration::from_secs(5),
2182        )
2183        .await;
2184    }
2185
2186    fn create_test_ws_client() -> DydxWebSocketClient {
2187        DydxWebSocketClient::new_public("ws://test".to_string(), None)
2188    }
2189
2190    #[rstest]
2191    fn test_new_data_client() {
2192        setup_test_env();
2193
2194        let client_id = ClientId::from("DYDX-001");
2195        let config = DydxDataClientConfig::default();
2196        let http_client = DydxHttpClient::default();
2197
2198        let client = DydxDataClient::new(client_id, config, http_client, create_test_ws_client());
2199        assert!(client.is_ok());
2200
2201        let client = client.unwrap();
2202        assert_eq!(client.client_id(), client_id);
2203        assert_eq!(client.venue(), *DYDX_VENUE);
2204        assert!(!client.is_connected());
2205    }
2206
2207    #[tokio::test]
2208    async fn test_data_client_lifecycle() {
2209        setup_test_env();
2210
2211        let client_id = ClientId::from("DYDX-001");
2212        let config = DydxDataClientConfig::default();
2213        let http_client = DydxHttpClient::default();
2214
2215        let mut client =
2216            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2217
2218        // Test start
2219        assert!(client.start().is_ok());
2220
2221        // Test stop
2222        assert!(client.stop().is_ok());
2223        assert!(!client.is_connected());
2224
2225        // Test reset
2226        assert!(client.reset().is_ok());
2227
2228        // Test dispose
2229        assert!(client.dispose().is_ok());
2230    }
2231
2232    #[rstest]
2233    fn test_subscribe_unsubscribe_instruments_noop() {
2234        setup_test_env();
2235
2236        let client_id = ClientId::from("DYDX-TEST");
2237        let config = DydxDataClientConfig::default();
2238        let http_client = DydxHttpClient::default();
2239
2240        let mut client =
2241            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2242
2243        let venue = *DYDX_VENUE;
2244        let command_id = UUID4::new();
2245        let ts_init = get_atomic_clock_realtime().get_time_ns();
2246
2247        let subscribe = SubscribeInstruments {
2248            client_id: Some(client_id),
2249            venue,
2250            command_id,
2251            ts_init,
2252            correlation_id: None,
2253            params: None,
2254        };
2255        let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None, None);
2256
2257        // No-op methods should succeed even without a WebSocket client.
2258        assert!(client.subscribe_instruments(&subscribe).is_ok());
2259        assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2260    }
2261
2262    #[rstest]
2263    fn test_bar_type_mappings_registration() {
2264        setup_test_env();
2265
2266        let client_id = ClientId::from("DYDX-TEST");
2267        let config = DydxDataClientConfig::default();
2268        let http_client = DydxHttpClient::default();
2269
2270        let client =
2271            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2272
2273        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2274        let spec = BarSpecification {
2275            step: std::num::NonZeroUsize::new(1).unwrap(),
2276            aggregation: BarAggregation::Minute,
2277            price_type: PriceType::Last,
2278        };
2279        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2280
2281        // Initially no topics registered
2282        assert!(client.get_bar_topics().is_empty());
2283        assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2284
2285        // Register topic
2286        client
2287            .bar_type_mappings
2288            .insert("BTC-USD/1MIN".to_string(), bar_type);
2289
2290        // Verify registration
2291        assert_eq!(client.get_bar_topics().len(), 1);
2292        assert!(
2293            client
2294                .get_bar_topics()
2295                .contains(&"BTC-USD/1MIN".to_string())
2296        );
2297        assert_eq!(
2298            client.get_bar_type_for_topic("BTC-USD/1MIN"),
2299            Some(bar_type)
2300        );
2301
2302        // Register another topic
2303        let spec_5min = BarSpecification {
2304            step: std::num::NonZeroUsize::new(5).unwrap(),
2305            aggregation: BarAggregation::Minute,
2306            price_type: PriceType::Last,
2307        };
2308        let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2309        client
2310            .bar_type_mappings
2311            .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2312
2313        // Verify multiple topics
2314        assert_eq!(client.get_bar_topics().len(), 2);
2315        assert_eq!(
2316            client.get_bar_type_for_topic("BTC-USD/5MINS"),
2317            Some(bar_type_5min)
2318        );
2319    }
2320
2321    #[rstest]
2322    fn test_bar_type_mappings_unregistration() {
2323        setup_test_env();
2324
2325        let client_id = ClientId::from("DYDX-TEST");
2326        let config = DydxDataClientConfig::default();
2327        let http_client = DydxHttpClient::default();
2328
2329        let client =
2330            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2331
2332        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2333        let spec = BarSpecification {
2334            step: std::num::NonZeroUsize::new(1).unwrap(),
2335            aggregation: BarAggregation::Hour,
2336            price_type: PriceType::Last,
2337        };
2338        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2339
2340        // Register topic
2341        client
2342            .bar_type_mappings
2343            .insert("ETH-USD/1HOUR".to_string(), bar_type);
2344        assert_eq!(client.get_bar_topics().len(), 1);
2345
2346        // Unregister topic
2347        client.bar_type_mappings.remove("ETH-USD/1HOUR");
2348
2349        // Verify unregistration
2350        assert!(client.get_bar_topics().is_empty());
2351        assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2352    }
2353
2354    #[rstest]
2355    fn test_bar_type_mappings_lookup_nonexistent() {
2356        setup_test_env();
2357
2358        let client_id = ClientId::from("DYDX-TEST");
2359        let config = DydxDataClientConfig::default();
2360        let http_client = DydxHttpClient::default();
2361
2362        let client =
2363            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2364
2365        // Lookup non-existent topic
2366        assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2367        assert!(client.get_bar_topics().is_empty());
2368    }
2369
2370    #[tokio::test]
2371    async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2372        setup_test_env();
2373
2374        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2375        let instruments = Arc::new(DashMap::new());
2376        let order_books = Arc::new(DashMap::new());
2377        let last_quotes = Arc::new(DashMap::new());
2378        let ws_client = DydxWebSocketClient::new_public("ws://test".to_string(), None);
2379        let active_orderbook_subs = Arc::new(DashMap::new());
2380        let active_trade_subs = Arc::new(DashMap::new());
2381        let active_bar_subs = Arc::new(DashMap::new());
2382
2383        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2384        let bar_ts = get_atomic_clock_realtime().get_time_ns();
2385
2386        // Add a test instrument to the cache (required for crossed book resolution)
2387        let symbol = Symbol::from("BTC-USD-PERP");
2388        let instrument = CryptoPerpetual::new(
2389            instrument_id,
2390            symbol,
2391            Currency::BTC(),
2392            Currency::USD(),
2393            Currency::USD(),
2394            false,
2395            2,
2396            4,
2397            Price::from("0.01"),
2398            Quantity::from("0.0001"),
2399            None,
2400            None,
2401            None,
2402            None,
2403            None,
2404            None,
2405            None,
2406            None,
2407            None,
2408            None,
2409            None,
2410            None,
2411            bar_ts,
2412            bar_ts,
2413        );
2414        instruments.insert(
2415            Ustr::from("BTC-USD-PERP"),
2416            InstrumentAny::CryptoPerpetual(instrument),
2417        );
2418
2419        let price = Price::from("100.00");
2420        let size = Quantity::from("1.0");
2421
2422        // Create both bid and ask deltas to generate a quote
2423        let bid_delta = OrderBookDelta::new(
2424            instrument_id,
2425            BookAction::Add,
2426            BookOrder::new(OrderSide::Buy, price, size, 1),
2427            0,
2428            1,
2429            bar_ts,
2430            bar_ts,
2431        );
2432        let ask_delta = OrderBookDelta::new(
2433            instrument_id,
2434            BookAction::Add,
2435            BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2436            0,
2437            1,
2438            bar_ts,
2439            bar_ts,
2440        );
2441        let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2442
2443        let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2444
2445        let incomplete_bars = Arc::new(DashMap::new());
2446        let ctx = WsMessageContext {
2447            data_sender: sender,
2448            instruments,
2449            order_books,
2450            last_quotes,
2451            ws_client,
2452            active_orderbook_subs,
2453            active_trade_subs,
2454            active_bar_subs,
2455            incomplete_bars,
2456        };
2457        DydxDataClient::handle_ws_message(message, &ctx);
2458
2459        // Ensure order book was created and top-of-book quote cached.
2460        assert!(ctx.order_books.get(&instrument_id).is_some());
2461        assert!(ctx.last_quotes.get(&instrument_id).is_some());
2462
2463        // Ensure a quote and deltas Data events were emitted.
2464        let mut saw_quote = false;
2465        let mut saw_deltas = false;
2466
2467        while let Ok(event) = rx.try_recv() {
2468            if let DataEvent::Data(data) = event {
2469                match data {
2470                    NautilusData::Quote(_) => saw_quote = true,
2471                    NautilusData::Deltas(_) => saw_deltas = true,
2472                    _ => {}
2473                }
2474            }
2475        }
2476
2477        assert!(saw_quote);
2478        assert!(saw_deltas);
2479    }
2480
2481    #[rstest]
2482    fn test_handle_ws_message_error_does_not_panic() {
2483        // Ensure malformed/error WebSocket messages are logged and ignored
2484        // without panicking or affecting client state.
2485        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2486        let instruments = Arc::new(DashMap::new());
2487        let order_books = Arc::new(DashMap::new());
2488        let last_quotes = Arc::new(DashMap::new());
2489        let ws_client = DydxWebSocketClient::new_public("ws://test".to_string(), None);
2490        let active_orderbook_subs = Arc::new(DashMap::new());
2491        let active_trade_subs = Arc::new(DashMap::new());
2492        let active_bar_subs = Arc::new(DashMap::new());
2493        let incomplete_bars = Arc::new(DashMap::new());
2494
2495        let ctx = WsMessageContext {
2496            data_sender: sender,
2497            instruments,
2498            order_books,
2499            last_quotes,
2500            ws_client,
2501            active_orderbook_subs,
2502            active_trade_subs,
2503            active_bar_subs,
2504            incomplete_bars,
2505        };
2506
2507        let err = crate::websocket::error::DydxWebSocketError::from_message(
2508            "malformed WebSocket payload".to_string(),
2509        );
2510
2511        DydxDataClient::handle_ws_message(
2512            crate::websocket::enums::NautilusWsMessage::Error(err),
2513            &ctx,
2514        );
2515    }
2516
2517    #[tokio::test]
2518    async fn test_request_bars_partitioning_math_does_not_panic() {
2519        setup_test_env();
2520
2521        let client_id = ClientId::from("DYDX-BARS");
2522        let config = DydxDataClientConfig::default();
2523        let http_client = DydxHttpClient::default();
2524
2525        let client =
2526            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2527
2528        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2529        let spec = BarSpecification {
2530            step: std::num::NonZeroUsize::new(1).unwrap(),
2531            aggregation: BarAggregation::Minute,
2532            price_type: PriceType::Last,
2533        };
2534        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2535
2536        let now = Utc::now();
2537        let start = Some(now - chrono::Duration::hours(10));
2538        let end = Some(now);
2539
2540        let request = RequestBars::new(
2541            bar_type,
2542            start,
2543            end,
2544            None,
2545            Some(client_id),
2546            UUID4::new(),
2547            get_atomic_clock_realtime().get_time_ns(),
2548            None,
2549        );
2550
2551        // We only verify that the partitioning logic executes without panicking;
2552        // HTTP calls are allowed to fail and are handled internally.
2553        assert!(client.request_bars(&request).is_ok());
2554    }
2555
2556    #[tokio::test]
2557    async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2558        setup_test_env();
2559
2560        // Prepare a simple candles response served by a local Axum HTTP server.
2561        let now = Utc::now();
2562        let candle = crate::http::models::Candle {
2563            started_at: now - chrono::Duration::minutes(1),
2564            ticker: "BTC-USD".to_string(),
2565            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2566            open: dec!(100.0),
2567            high: dec!(101.0),
2568            low: dec!(99.0),
2569            close: dec!(100.5),
2570            base_token_volume: dec!(1.0),
2571            usd_volume: dec!(100.0),
2572            trades: 10,
2573            starting_open_interest: dec!(1000.0),
2574        };
2575        let candles_response = crate::http::models::CandlesResponse {
2576            candles: vec![candle],
2577        };
2578        let state = CandlesTestState {
2579            response: Arc::new(candles_response),
2580        };
2581        let addr = start_candles_test_server(state).await;
2582        let base_url = format!("http://{addr}");
2583
2584        let client_id = ClientId::from("DYDX-BARS-MONTHS");
2585        let config = DydxDataClientConfig {
2586            base_url_http: Some(base_url),
2587            is_testnet: true,
2588            ..Default::default()
2589        };
2590
2591        let http_client = DydxHttpClient::new(
2592            config.base_url_http.clone(),
2593            config.http_timeout_secs,
2594            config.http_proxy_url.clone(),
2595            config.is_testnet,
2596            None,
2597        )
2598        .unwrap();
2599
2600        let client =
2601            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2602
2603        // Seed instrument cache so request_bars can resolve precision.
2604        let instrument = create_test_instrument_any();
2605        let instrument_id = instrument.id();
2606        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2607        client.instruments.insert(symbol_key, instrument);
2608
2609        let spec = BarSpecification {
2610            step: std::num::NonZeroUsize::new(1).unwrap(),
2611            aggregation: BarAggregation::Minute,
2612            price_type: PriceType::Last,
2613        };
2614        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2615
2616        // Use a date range spanning multiple months to exercise partitioning math.
2617        let start = Some(now - chrono::Duration::days(90));
2618        let end = Some(now);
2619
2620        // Limit the total number of bars so the test completes quickly.
2621        let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2622
2623        let request = RequestBars::new(
2624            bar_type,
2625            start,
2626            end,
2627            limit,
2628            Some(client_id),
2629            UUID4::new(),
2630            get_atomic_clock_realtime().get_time_ns(),
2631            None,
2632        );
2633
2634        assert!(client.request_bars(&request).is_ok());
2635    }
2636
2637    #[derive(Clone)]
2638    struct OrderbookTestState {
2639        snapshot: Arc<crate::http::models::OrderbookResponse>,
2640    }
2641
2642    #[derive(Clone)]
2643    struct TradesTestState {
2644        response: Arc<crate::http::models::TradesResponse>,
2645        last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2646        last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2647    }
2648
2649    #[derive(Clone)]
2650    struct CandlesTestState {
2651        response: Arc<crate::http::models::CandlesResponse>,
2652    }
2653
2654    async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2655        async fn handle_orderbook(
2656            Path(_ticker): Path<String>,
2657            State(state): State<OrderbookTestState>,
2658        ) -> Json<crate::http::models::OrderbookResponse> {
2659            Json((*state.snapshot).clone())
2660        }
2661
2662        let router = Router::new().route(
2663            "/v4/orderbooks/perpetualMarket/{ticker}",
2664            get(handle_orderbook).with_state(state),
2665        );
2666
2667        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2668        let addr = listener.local_addr().unwrap();
2669
2670        get_runtime().spawn(async move {
2671            axum::serve(listener, router.into_make_service())
2672                .await
2673                .unwrap();
2674        });
2675
2676        wait_for_server(addr).await;
2677        addr
2678    }
2679
2680    async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2681        async fn handle_trades(
2682            Path(ticker): Path<String>,
2683            Query(params): Query<HashMap<String, String>>,
2684            State(state): State<TradesTestState>,
2685        ) -> Json<crate::http::models::TradesResponse> {
2686            {
2687                let mut last_ticker = state.last_ticker.lock().await;
2688                *last_ticker = Some(ticker);
2689            }
2690
2691            let limit = params
2692                .get("limit")
2693                .and_then(|value| value.parse::<u32>().ok());
2694            {
2695                let mut last_limit = state.last_limit.lock().await;
2696                *last_limit = Some(limit);
2697            }
2698
2699            Json((*state.response).clone())
2700        }
2701
2702        let router = Router::new().route(
2703            "/v4/trades/perpetualMarket/{ticker}",
2704            get(handle_trades).with_state(state),
2705        );
2706
2707        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2708        let addr = listener.local_addr().unwrap();
2709
2710        get_runtime().spawn(async move {
2711            axum::serve(listener, router.into_make_service())
2712                .await
2713                .unwrap();
2714        });
2715
2716        wait_for_server(addr).await;
2717        addr
2718    }
2719
2720    async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2721        async fn handle_candles(
2722            Path(_ticker): Path<String>,
2723            Query(_params): Query<HashMap<String, String>>,
2724            State(state): State<CandlesTestState>,
2725        ) -> Json<crate::http::models::CandlesResponse> {
2726            Json((*state.response).clone())
2727        }
2728
2729        let router = Router::new().route(
2730            "/v4/candles/perpetualMarkets/{ticker}",
2731            get(handle_candles).with_state(state),
2732        );
2733
2734        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2735        let addr = listener.local_addr().unwrap();
2736
2737        get_runtime().spawn(async move {
2738            axum::serve(listener, router.into_make_service())
2739                .await
2740                .unwrap();
2741        });
2742
2743        wait_for_server(addr).await;
2744        addr
2745    }
2746
2747    fn create_test_instrument_any() -> InstrumentAny {
2748        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2749
2750        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2751            instrument_id,
2752            instrument_id.symbol,
2753            Currency::BTC(),
2754            Currency::USD(),
2755            Currency::USD(),
2756            false,
2757            2,                                // price_precision
2758            8,                                // size_precision
2759            Price::new(0.01, 2),              // price_increment
2760            Quantity::new(0.001, 8),          // size_increment
2761            Some(Quantity::new(1.0, 0)),      // multiplier
2762            Some(Quantity::new(0.001, 8)),    // lot_size
2763            Some(Quantity::new(100000.0, 8)), // max_quantity
2764            Some(Quantity::new(0.001, 8)),    // min_quantity
2765            None,                             // max_notional
2766            None,                             // min_notional
2767            Some(Price::new(1000000.0, 2)),   // max_price
2768            Some(Price::new(0.01, 2)),        // min_price
2769            Some(dec!(0.05)),                 // margin_init
2770            Some(dec!(0.03)),                 // margin_maint
2771            Some(dec!(0.0002)),               // maker_fee
2772            Some(dec!(0.0005)),               // taker_fee
2773            UnixNanos::default(),             // ts_event
2774            UnixNanos::default(),             // ts_init
2775        ))
2776    }
2777
2778    // ------------------------------------------------------------------------
2779    // Precision & bar conversion tests
2780    // ------------------------------------------------------------------------
2781
2782    #[tokio::test]
2783    async fn test_candle_to_bar_price_size_edge_cases() {
2784        setup_test_env();
2785
2786        let clock = get_atomic_clock_realtime();
2787        let now = Utc::now();
2788
2789        // Very large prices and sizes (edge cases).
2790        let candle = Candle {
2791            started_at: now,
2792            ticker: "BTC-USD".to_string(),
2793            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2794            open: dec!(123456789.123456),
2795            high: dec!(987654321.987654),  // high is max
2796            low: dec!(123456.789),         // low is min
2797            close: dec!(223456789.123456), // close between low and high
2798            base_token_volume: dec!(0.00000001),
2799            usd_volume: dec!(1234500.0),
2800            trades: 42,
2801            starting_open_interest: dec!(1000.0),
2802        };
2803
2804        let instrument = create_test_instrument_any();
2805        let instrument_id = instrument.id();
2806        let spec = BarSpecification {
2807            step: std::num::NonZeroUsize::new(1).unwrap(),
2808            aggregation: BarAggregation::Minute,
2809            price_type: PriceType::Last,
2810        };
2811        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2812
2813        let bar = DydxDataClient::candle_to_bar(
2814            &candle,
2815            bar_type,
2816            instrument.price_precision(),
2817            instrument.size_precision(),
2818            60,
2819            clock,
2820        )
2821        .expect("candle_to_bar should handle large/scientific values");
2822
2823        assert!(bar.open.as_f64() > 0.0);
2824        assert!(bar.high.as_f64() >= bar.low.as_f64());
2825        assert!(bar.volume.as_f64() > 0.0);
2826    }
2827
2828    #[tokio::test]
2829    async fn test_candle_to_bar_ts_event_overflow_safe() {
2830        setup_test_env();
2831
2832        let clock = get_atomic_clock_realtime();
2833        let now = Utc::now();
2834
2835        let candle = Candle {
2836            started_at: now,
2837            ticker: "BTC-USD".to_string(),
2838            resolution: crate::common::enums::DydxCandleResolution::OneDay,
2839            open: Decimal::from(1),
2840            high: Decimal::from(1),
2841            low: Decimal::from(1),
2842            close: Decimal::from(1),
2843            base_token_volume: Decimal::from(1),
2844            usd_volume: Decimal::from(1),
2845            trades: 1,
2846            starting_open_interest: Decimal::from(1),
2847        };
2848
2849        let instrument = create_test_instrument_any();
2850        let instrument_id = instrument.id();
2851        let spec = BarSpecification {
2852            step: std::num::NonZeroUsize::new(1).unwrap(),
2853            aggregation: BarAggregation::Day,
2854            price_type: PriceType::Last,
2855        };
2856        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2857
2858        // Use an intentionally large bar_secs to exercise saturating_add path.
2859        let bar_secs = i64::MAX / 1_000_000_000;
2860        let bar = DydxDataClient::candle_to_bar(
2861            &candle,
2862            bar_type,
2863            instrument.price_precision(),
2864            instrument.size_precision(),
2865            bar_secs,
2866            clock,
2867        )
2868        .expect("candle_to_bar should not overflow on ts_event");
2869
2870        assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2871    }
2872
2873    #[tokio::test]
2874    async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2875        // Simulate bars with ts_event both before and after current_time_ns and
2876        // ensure only completed bars (ts_event < now) are retained.
2877        let clock = get_atomic_clock_realtime();
2878        let now = Utc::now();
2879
2880        // Use a dedicated data channel for this test and register it
2881        // before constructing the data client.
2882        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2883        set_data_event_sender(sender);
2884
2885        // two candles: one in the past, one in the future
2886        let candle_past = Candle {
2887            started_at: now - chrono::Duration::minutes(2),
2888            ticker: "BTC-USD".to_string(),
2889            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2890            open: Decimal::from(1),
2891            high: Decimal::from(2),
2892            low: Decimal::from(1),
2893            close: Decimal::from(1),
2894            base_token_volume: Decimal::from(1),
2895            usd_volume: Decimal::from(1),
2896            trades: 1,
2897            starting_open_interest: Decimal::from(1),
2898        };
2899        let candle_future = Candle {
2900            started_at: now + chrono::Duration::minutes(2),
2901            ..candle_past.clone()
2902        };
2903
2904        let candles_response = CandlesResponse {
2905            candles: vec![candle_past, candle_future],
2906        };
2907
2908        let state = CandlesTestState {
2909            response: Arc::new(candles_response),
2910        };
2911        let addr = start_candles_test_server(state).await;
2912        let base_url = format!("http://{addr}");
2913
2914        let client_id = ClientId::from("DYDX-BARS-SKEW");
2915        let config = DydxDataClientConfig {
2916            base_url_http: Some(base_url),
2917            is_testnet: true,
2918            ..Default::default()
2919        };
2920
2921        let http_client = DydxHttpClient::new(
2922            config.base_url_http.clone(),
2923            config.http_timeout_secs,
2924            config.http_proxy_url.clone(),
2925            config.is_testnet,
2926            None,
2927        )
2928        .unwrap();
2929
2930        let client =
2931            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2932
2933        let instrument = create_test_instrument_any();
2934        let instrument_id = instrument.id();
2935        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
2936        client.instruments.insert(symbol_key, instrument);
2937
2938        let spec = BarSpecification {
2939            step: std::num::NonZeroUsize::new(1).unwrap(),
2940            aggregation: BarAggregation::Minute,
2941            price_type: PriceType::Last,
2942        };
2943        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2944
2945        let request = RequestBars::new(
2946            bar_type,
2947            Some(now - chrono::Duration::minutes(5)),
2948            Some(now + chrono::Duration::minutes(5)),
2949            None,
2950            Some(client_id),
2951            UUID4::new(),
2952            clock.get_time_ns(),
2953            None,
2954        );
2955
2956        assert!(client.request_bars(&request).is_ok());
2957
2958        let timeout = tokio::time::Duration::from_secs(3);
2959        if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
2960            tokio::time::timeout(timeout, rx.recv()).await
2961        {
2962            // Only the past candle should remain after filtering.
2963            assert_eq!(resp.data.len(), 1);
2964        }
2965    }
2966
2967    #[rstest]
2968    fn test_decimal_to_f64_precision_loss_within_tolerance() {
2969        // Verify converting via Price/Quantity preserves reasonable precision.
2970        let price_value = 12345.125_f64;
2971        let qty_value = 0.00012345_f64;
2972
2973        let price = Price::new(price_value, 6);
2974        let qty = Quantity::new(qty_value, 8);
2975
2976        let price_diff = (price.as_f64() - price_value).abs();
2977        let qty_diff = (qty.as_f64() - qty_value).abs();
2978
2979        // Differences should be well within a tiny epsilon.
2980        assert!(price_diff < 1e-10);
2981        assert!(qty_diff < 1e-12);
2982    }
2983
2984    #[tokio::test]
2985    async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
2986        // Set up a dedicated data event channel for this test.
2987        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2988        set_data_event_sender(sender);
2989
2990        // Prepare a static orderbook snapshot served by a local Axum HTTP server.
2991        let snapshot = crate::http::models::OrderbookResponse {
2992            bids: vec![crate::http::models::OrderbookLevel {
2993                price: dec!(100.0),
2994                size: dec!(1.0),
2995            }],
2996            asks: vec![crate::http::models::OrderbookLevel {
2997                price: dec!(101.0),
2998                size: dec!(2.0),
2999            }],
3000        };
3001        let state = OrderbookTestState {
3002            snapshot: Arc::new(snapshot),
3003        };
3004        let addr = start_orderbook_test_server(state).await;
3005        let base_url = format!("http://{addr}");
3006
3007        // Configure the data client with a short refresh interval and mock HTTP base URL.
3008        let client_id = ClientId::from("DYDX-REFRESH");
3009        let config = DydxDataClientConfig {
3010            is_testnet: true,
3011            base_url_http: Some(base_url),
3012            orderbook_refresh_interval_secs: Some(1),
3013            instrument_refresh_interval_secs: None,
3014            ..Default::default()
3015        };
3016
3017        let http_client = DydxHttpClient::new(
3018            config.base_url_http.clone(),
3019            config.http_timeout_secs,
3020            config.http_proxy_url.clone(),
3021            config.is_testnet,
3022            None,
3023        )
3024        .unwrap();
3025
3026        let mut client =
3027            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3028
3029        // Seed instruments and orderbook state for a single instrument.
3030        let instrument = create_test_instrument_any();
3031        let instrument_id = instrument.id();
3032        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3033        client.instruments.insert(symbol_key, instrument);
3034        client.order_books.insert(
3035            instrument_id,
3036            OrderBook::new(instrument_id, BookType::L2_MBP),
3037        );
3038        client.active_orderbook_subs.insert(instrument_id, ());
3039
3040        // Start the refresh task and wait for a snapshot to be applied and emitted.
3041        client.start_orderbook_refresh_task().unwrap();
3042
3043        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3044        let mut saw_snapshot_event = false;
3045
3046        while std::time::Instant::now() < deadline {
3047            if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3048                tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3049            {
3050                saw_snapshot_event = true;
3051                break;
3052            }
3053        }
3054
3055        assert!(
3056            saw_snapshot_event,
3057            "expected at least one snapshot deltas event from refresh task"
3058        );
3059
3060        // Verify that the local orderbook has been updated with the snapshot.
3061        let book = client
3062            .order_books
3063            .get(&instrument_id)
3064            .expect("orderbook should exist after refresh");
3065        let best_bid = book.best_bid_price().expect("best bid should be set");
3066        let best_ask = book.best_ask_price().expect("best ask should be set");
3067
3068        assert_eq!(best_bid, Price::from("100.00"));
3069        assert_eq!(best_ask, Price::from("101.00"));
3070    }
3071
3072    #[rstest]
3073    fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3074        // Test scenario: bid_size > ask_size
3075        // Expected: DELETE ask, UPDATE bid (reduce by ask_size)
3076        let instrument = create_test_instrument_any();
3077        let instrument_id = instrument.id();
3078        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3079        let ts_init = get_atomic_clock_realtime().get_time_ns();
3080
3081        // Create initial non-crossed book
3082        let initial_deltas = vec![
3083            OrderBookDelta::new(
3084                instrument_id,
3085                BookAction::Add,
3086                BookOrder::new(
3087                    OrderSide::Buy,
3088                    Price::from("99.00"),
3089                    Quantity::from("1.0"),
3090                    0,
3091                ),
3092                0,
3093                0,
3094                ts_init,
3095                ts_init,
3096            ),
3097            OrderBookDelta::new(
3098                instrument_id,
3099                BookAction::Add,
3100                BookOrder::new(
3101                    OrderSide::Sell,
3102                    Price::from("101.00"),
3103                    Quantity::from("2.0"),
3104                    0,
3105                ),
3106                0,
3107                0,
3108                ts_init,
3109                ts_init,
3110            ),
3111        ];
3112        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3113            .unwrap();
3114
3115        // Create crossed orderbook: bid @ 102.00 (size 5.0) > ask @ 101.00 (size 2.0)
3116        let crossed_deltas = vec![OrderBookDelta::new(
3117            instrument_id,
3118            BookAction::Add,
3119            BookOrder::new(
3120                OrderSide::Buy,
3121                Price::from("102.00"),
3122                Quantity::from("5.0"),
3123                0,
3124            ),
3125            0,
3126            0,
3127            ts_init,
3128            ts_init,
3129        )];
3130        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3131
3132        let resolved =
3133            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3134                .unwrap();
3135
3136        // Verify resolution: ask @ 101.00 should be deleted
3137        // bid @ 102.00 should remain but reduced (note: precision affects exact value)
3138        assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3139        assert!(book.best_bid_size().unwrap().as_f64() < 5.0); // Reduced from original
3140        assert!(
3141            book.best_ask_price().is_none()
3142                || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3143        ); // No longer crossed
3144
3145        // Verify synthetic deltas were generated
3146        assert!(resolved.deltas.len() > 1); // Original delta + synthetic resolution deltas
3147    }
3148
3149    #[rstest]
3150    fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3151        // Test scenario: bid_size < ask_size
3152        // Expected: DELETE bid, UPDATE ask (reduce by bid_size)
3153        let instrument = create_test_instrument_any();
3154        let instrument_id = instrument.id();
3155        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3156        let ts_init = get_atomic_clock_realtime().get_time_ns();
3157
3158        // Create initial non-crossed book
3159        let initial_deltas = vec![
3160            OrderBookDelta::new(
3161                instrument_id,
3162                BookAction::Add,
3163                BookOrder::new(
3164                    OrderSide::Buy,
3165                    Price::from("99.00"),
3166                    Quantity::from("1.0"),
3167                    0,
3168                ),
3169                0,
3170                0,
3171                ts_init,
3172                ts_init,
3173            ),
3174            OrderBookDelta::new(
3175                instrument_id,
3176                BookAction::Add,
3177                BookOrder::new(
3178                    OrderSide::Sell,
3179                    Price::from("101.00"),
3180                    Quantity::from("5.0"),
3181                    0,
3182                ),
3183                0,
3184                0,
3185                ts_init,
3186                ts_init,
3187            ),
3188        ];
3189        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3190            .unwrap();
3191
3192        // Create crossed orderbook: bid @ 102.00 (size 2.0) < ask @ 101.00 (size 5.0)
3193        let crossed_deltas = vec![OrderBookDelta::new(
3194            instrument_id,
3195            BookAction::Add,
3196            BookOrder::new(
3197                OrderSide::Buy,
3198                Price::from("102.00"),
3199                Quantity::from("2.0"),
3200                0,
3201            ),
3202            0,
3203            0,
3204            ts_init,
3205            ts_init,
3206        )];
3207        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3208
3209        let resolved =
3210            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3211                .unwrap();
3212
3213        // Verify resolution: bid @ 102.00 should be deleted, ask @ 101.00 reduced
3214        assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3215        assert!(book.best_ask_size().unwrap().as_f64() < 5.0); // Reduced from original
3216        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level remains
3217        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); // No longer crossed
3218
3219        // Verify synthetic deltas were generated
3220        assert!(resolved.deltas.len() > 1);
3221    }
3222
3223    #[rstest]
3224    fn test_resolve_crossed_order_book_equal_sizes() {
3225        // Test scenario: bid_size == ask_size
3226        // Expected: DELETE both bid and ask
3227        let instrument = create_test_instrument_any();
3228        let instrument_id = instrument.id();
3229        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3230        let ts_init = get_atomic_clock_realtime().get_time_ns();
3231
3232        // Create initial non-crossed book with multiple levels
3233        let initial_deltas = vec![
3234            OrderBookDelta::new(
3235                instrument_id,
3236                BookAction::Add,
3237                BookOrder::new(
3238                    OrderSide::Buy,
3239                    Price::from("99.00"),
3240                    Quantity::from("1.0"),
3241                    0,
3242                ),
3243                0,
3244                0,
3245                ts_init,
3246                ts_init,
3247            ),
3248            OrderBookDelta::new(
3249                instrument_id,
3250                BookAction::Add,
3251                BookOrder::new(
3252                    OrderSide::Sell,
3253                    Price::from("101.00"),
3254                    Quantity::from("3.0"),
3255                    0,
3256                ),
3257                0,
3258                0,
3259                ts_init,
3260                ts_init,
3261            ),
3262        ];
3263        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3264            .unwrap();
3265
3266        // Create crossed orderbook: bid @ 102.00 (size 3.0) == ask @ 101.00 (size 3.0)
3267        let crossed_deltas = vec![OrderBookDelta::new(
3268            instrument_id,
3269            BookAction::Add,
3270            BookOrder::new(
3271                OrderSide::Buy,
3272                Price::from("102.00"),
3273                Quantity::from("3.0"),
3274                0,
3275            ),
3276            0,
3277            0,
3278            ts_init,
3279            ts_init,
3280        )];
3281        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3282
3283        let resolved =
3284            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3285                .unwrap();
3286
3287        // Verify resolution: both crossed levels should be deleted, reverting to deeper levels
3288        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level
3289        // Ask at 101.00 should be deleted, book may be empty on ask side or have deeper levels
3290        if let Some(ask_price) = book.best_ask_price() {
3291            assert!(ask_price > book.best_bid_price().unwrap()); // No longer crossed
3292        }
3293
3294        // Verify synthetic deltas were generated
3295        assert!(resolved.deltas.len() > 1);
3296    }
3297
3298    #[rstest]
3299    fn test_resolve_crossed_order_book_multiple_iterations() {
3300        // Test scenario: multiple crossed levels requiring multiple iterations
3301        let instrument = create_test_instrument_any();
3302        let instrument_id = instrument.id();
3303        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3304        let ts_init = get_atomic_clock_realtime().get_time_ns();
3305
3306        // Create initial book with multiple levels on both sides
3307        let initial_deltas = vec![
3308            OrderBookDelta::new(
3309                instrument_id,
3310                BookAction::Add,
3311                BookOrder::new(
3312                    OrderSide::Buy,
3313                    Price::from("98.00"),
3314                    Quantity::from("1.0"),
3315                    0,
3316                ),
3317                0,
3318                0,
3319                ts_init,
3320                ts_init,
3321            ),
3322            OrderBookDelta::new(
3323                instrument_id,
3324                BookAction::Add,
3325                BookOrder::new(
3326                    OrderSide::Sell,
3327                    Price::from("100.00"),
3328                    Quantity::from("1.0"),
3329                    0,
3330                ),
3331                0,
3332                0,
3333                ts_init,
3334                ts_init,
3335            ),
3336            OrderBookDelta::new(
3337                instrument_id,
3338                BookAction::Add,
3339                BookOrder::new(
3340                    OrderSide::Sell,
3341                    Price::from("101.00"),
3342                    Quantity::from("1.0"),
3343                    0,
3344                ),
3345                0,
3346                0,
3347                ts_init,
3348                ts_init,
3349            ),
3350        ];
3351        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3352            .unwrap();
3353
3354        // Create heavily crossed orderbook with multiple bids above asks
3355        let crossed_deltas = vec![
3356            OrderBookDelta::new(
3357                instrument_id,
3358                BookAction::Add,
3359                BookOrder::new(
3360                    OrderSide::Buy,
3361                    Price::from("102.00"),
3362                    Quantity::from("1.0"),
3363                    0,
3364                ),
3365                0,
3366                0,
3367                ts_init,
3368                ts_init,
3369            ),
3370            OrderBookDelta::new(
3371                instrument_id,
3372                BookAction::Add,
3373                BookOrder::new(
3374                    OrderSide::Buy,
3375                    Price::from("103.00"),
3376                    Quantity::from("1.0"),
3377                    0,
3378                ),
3379                0,
3380                0,
3381                ts_init,
3382                ts_init,
3383            ),
3384        ];
3385        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3386
3387        let resolved =
3388            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3389                .unwrap();
3390
3391        // Verify final state is uncrossed (or book has no asks left)
3392        if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3393            assert!(ask_price > bid_price, "Book should be uncrossed");
3394        }
3395
3396        // Verify multiple synthetic deltas were generated for multiple iterations
3397        assert!(resolved.deltas.len() > 2); // Original deltas + multiple resolution passes
3398    }
3399
3400    #[rstest]
3401    fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3402        // Test scenario: non-crossed orderbook should pass through unchanged
3403        let instrument = create_test_instrument_any();
3404        let instrument_id = instrument.id();
3405        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3406        let ts_init = get_atomic_clock_realtime().get_time_ns();
3407
3408        // Create normal non-crossed book
3409        let initial_deltas = vec![
3410            OrderBookDelta::new(
3411                instrument_id,
3412                BookAction::Add,
3413                BookOrder::new(
3414                    OrderSide::Buy,
3415                    Price::from("99.00"),
3416                    Quantity::from("1.0"),
3417                    0,
3418                ),
3419                0,
3420                0,
3421                ts_init,
3422                ts_init,
3423            ),
3424            OrderBookDelta::new(
3425                instrument_id,
3426                BookAction::Add,
3427                BookOrder::new(
3428                    OrderSide::Sell,
3429                    Price::from("101.00"),
3430                    Quantity::from("1.0"),
3431                    0,
3432                ),
3433                0,
3434                0,
3435                ts_init,
3436                ts_init,
3437            ),
3438        ];
3439        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3440            .unwrap();
3441
3442        // Add another non-crossing level
3443        let new_deltas = vec![OrderBookDelta::new(
3444            instrument_id,
3445            BookAction::Add,
3446            BookOrder::new(
3447                OrderSide::Buy,
3448                Price::from("98.50"),
3449                Quantity::from("2.0"),
3450                0,
3451            ),
3452            0,
3453            0,
3454            ts_init,
3455            ts_init,
3456        )];
3457        let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3458
3459        let original_bid = book.best_bid_price();
3460        let original_ask = book.best_ask_price();
3461
3462        let resolved =
3463            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3464                .unwrap();
3465
3466        // Verify no resolution was needed - deltas should be original only
3467        assert_eq!(resolved.deltas.len(), new_deltas.len());
3468        assert_eq!(book.best_bid_price(), original_bid);
3469        assert_eq!(book.best_ask_price(), original_ask);
3470        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3471    }
3472
3473    #[tokio::test]
3474    async fn test_request_instruments_successful_fetch() {
3475        // Test successful fetch of all instruments
3476        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3477        set_data_event_sender(sender);
3478
3479        let client_id = ClientId::from("DYDX-TEST");
3480        let config = DydxDataClientConfig::default();
3481        let http_client = DydxHttpClient::default();
3482        let client =
3483            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3484
3485        let request = RequestInstruments::new(
3486            None,
3487            None,
3488            Some(client_id),
3489            Some(*DYDX_VENUE),
3490            UUID4::new(),
3491            get_atomic_clock_realtime().get_time_ns(),
3492            None,
3493        );
3494
3495        // Execute request (spawns async task)
3496        assert!(client.request_instruments(&request).is_ok());
3497
3498        // Wait for response (with timeout)
3499        let timeout = tokio::time::Duration::from_secs(5);
3500        let result = tokio::time::timeout(timeout, rx.recv()).await;
3501
3502        match result {
3503            Ok(Some(DataEvent::Response(resp))) => {
3504                if let DataResponse::Instruments(inst_resp) = resp {
3505                    // Verify response structure
3506                    assert_eq!(inst_resp.correlation_id, request.request_id);
3507                    assert_eq!(inst_resp.client_id, client_id);
3508                    assert_eq!(inst_resp.venue, *DYDX_VENUE);
3509                    assert!(inst_resp.start.is_none());
3510                    assert!(inst_resp.end.is_none());
3511                    // Note: may be empty if HTTP fails, but structure should be correct
3512                }
3513            }
3514            Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3515            Ok(None) => panic!("Channel closed unexpectedly"),
3516            Err(_) => {
3517                // Timeout is acceptable if testnet is unreachable
3518                println!("Test timed out - testnet may be unreachable");
3519            }
3520        }
3521    }
3522
3523    #[tokio::test]
3524    async fn test_request_instruments_empty_response_on_http_error() {
3525        // Test empty response handling when HTTP call fails
3526        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3527        set_data_event_sender(sender);
3528
3529        let client_id = ClientId::from("DYDX-ERROR-TEST");
3530        let config = DydxDataClientConfig {
3531            base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3532            ..Default::default()
3533        };
3534        let http_client = DydxHttpClient::new(
3535            config.base_url_http.clone(),
3536            config.http_timeout_secs,
3537            config.http_proxy_url.clone(),
3538            config.is_testnet,
3539            None,
3540        )
3541        .unwrap();
3542
3543        let client =
3544            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3545
3546        let request = RequestInstruments::new(
3547            None,
3548            None,
3549            Some(client_id),
3550            Some(*DYDX_VENUE),
3551            UUID4::new(),
3552            get_atomic_clock_realtime().get_time_ns(),
3553            None,
3554        );
3555
3556        assert!(client.request_instruments(&request).is_ok());
3557
3558        // Should receive empty response on error
3559        let timeout = tokio::time::Duration::from_secs(3);
3560        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3561            tokio::time::timeout(timeout, rx.recv()).await
3562        {
3563            assert!(
3564                resp.data.is_empty(),
3565                "Expected empty instruments on HTTP error"
3566            );
3567            assert_eq!(resp.correlation_id, request.request_id);
3568            assert_eq!(resp.client_id, client_id);
3569        }
3570    }
3571
3572    #[tokio::test]
3573    async fn test_request_instruments_caching() {
3574        // Test instrument caching after fetch
3575        setup_test_env();
3576
3577        let client_id = ClientId::from("DYDX-CACHE-TEST");
3578        let config = DydxDataClientConfig::default();
3579        let http_client = DydxHttpClient::default();
3580        let client =
3581            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3582
3583        let initial_cache_size = client.instruments.len();
3584
3585        let request = RequestInstruments::new(
3586            None,
3587            None,
3588            Some(client_id),
3589            Some(*DYDX_VENUE),
3590            UUID4::new(),
3591            get_atomic_clock_realtime().get_time_ns(),
3592            None,
3593        );
3594
3595        assert!(client.request_instruments(&request).is_ok());
3596
3597        // Wait for async task to complete
3598        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3599
3600        // Verify cache populated (if HTTP succeeded)
3601        let final_cache_size = client.instruments.len();
3602        // Cache should be unchanged (empty) if HTTP failed, or populated if succeeded
3603        // We can't assert exact size without mocking, but can verify no panic
3604        assert!(final_cache_size >= initial_cache_size);
3605    }
3606
3607    #[tokio::test]
3608    async fn test_request_instruments_correlation_id_matching() {
3609        // Test correlation_id matching in response
3610        setup_test_env();
3611
3612        let client_id = ClientId::from("DYDX-CORR-TEST");
3613        let config = DydxDataClientConfig::default();
3614        let http_client = DydxHttpClient::default();
3615        let client =
3616            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3617
3618        let request_id = UUID4::new();
3619        let request = RequestInstruments::new(
3620            None,
3621            None,
3622            Some(client_id),
3623            Some(*DYDX_VENUE),
3624            request_id,
3625            get_atomic_clock_realtime().get_time_ns(),
3626            None,
3627        );
3628
3629        // Should execute without panic (actual correlation checked in async handler)
3630        assert!(client.request_instruments(&request).is_ok());
3631    }
3632
3633    #[tokio::test]
3634    async fn test_request_instruments_venue_assignment() {
3635        // Test venue assignment
3636        setup_test_env();
3637
3638        let client_id = ClientId::from("DYDX-VENUE-TEST");
3639        let config = DydxDataClientConfig::default();
3640        let http_client = DydxHttpClient::default();
3641        let client =
3642            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3643
3644        assert_eq!(client.venue(), *DYDX_VENUE);
3645
3646        let request = RequestInstruments::new(
3647            None,
3648            None,
3649            Some(client_id),
3650            Some(*DYDX_VENUE),
3651            UUID4::new(),
3652            get_atomic_clock_realtime().get_time_ns(),
3653            None,
3654        );
3655
3656        assert!(client.request_instruments(&request).is_ok());
3657    }
3658
3659    #[tokio::test]
3660    async fn test_request_instruments_timestamp_handling() {
3661        // Test timestamp handling (start_nanos, end_nanos)
3662        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3663        set_data_event_sender(sender);
3664
3665        let client_id = ClientId::from("DYDX-TS-TEST");
3666        let config = DydxDataClientConfig::default();
3667        let http_client = DydxHttpClient::default();
3668        let client =
3669            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3670
3671        let now = Utc::now();
3672        let start = Some(now - chrono::Duration::hours(24));
3673        let end = Some(now);
3674
3675        let request = RequestInstruments::new(
3676            start,
3677            end,
3678            Some(client_id),
3679            Some(*DYDX_VENUE),
3680            UUID4::new(),
3681            get_atomic_clock_realtime().get_time_ns(),
3682            None,
3683        );
3684
3685        assert!(client.request_instruments(&request).is_ok());
3686
3687        // Wait for response
3688        let timeout = tokio::time::Duration::from_secs(3);
3689        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3690            tokio::time::timeout(timeout, rx.recv()).await
3691        {
3692            // Verify timestamps are set
3693            assert!(resp.start.unwrap() > 0);
3694            assert!(resp.end.unwrap() > 0);
3695            assert!(resp.start.unwrap() <= resp.end.unwrap());
3696            assert!(resp.ts_init > 0);
3697        }
3698    }
3699
3700    #[tokio::test]
3701    async fn test_request_instruments_with_start_only() {
3702        // Test timestamp handling when only `start` is provided
3703        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3704        set_data_event_sender(sender);
3705
3706        let client_id = ClientId::from("DYDX-TS-START-ONLY");
3707        let config = DydxDataClientConfig::default();
3708        let http_client = DydxHttpClient::default();
3709        let client =
3710            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3711
3712        let now = Utc::now();
3713        let start = Some(now - chrono::Duration::hours(24));
3714
3715        let request = RequestInstruments::new(
3716            start,
3717            None,
3718            Some(client_id),
3719            Some(*DYDX_VENUE),
3720            UUID4::new(),
3721            get_atomic_clock_realtime().get_time_ns(),
3722            None,
3723        );
3724
3725        assert!(client.request_instruments(&request).is_ok());
3726
3727        let timeout = tokio::time::Duration::from_secs(3);
3728        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3729            tokio::time::timeout(timeout, rx.recv()).await
3730        {
3731            assert!(resp.start.is_some());
3732            assert!(resp.end.is_none());
3733            assert!(resp.ts_init > 0);
3734        }
3735    }
3736
3737    #[tokio::test]
3738    async fn test_request_instruments_with_end_only() {
3739        // Test timestamp handling when only `end` is provided
3740        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3741        set_data_event_sender(sender);
3742
3743        let client_id = ClientId::from("DYDX-TS-END-ONLY");
3744        let config = DydxDataClientConfig::default();
3745        let http_client = DydxHttpClient::default();
3746        let client =
3747            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3748
3749        let now = Utc::now();
3750        let end = Some(now);
3751
3752        let request = RequestInstruments::new(
3753            None,
3754            end,
3755            Some(client_id),
3756            Some(*DYDX_VENUE),
3757            UUID4::new(),
3758            get_atomic_clock_realtime().get_time_ns(),
3759            None,
3760        );
3761
3762        assert!(client.request_instruments(&request).is_ok());
3763
3764        let timeout = tokio::time::Duration::from_secs(3);
3765        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3766            tokio::time::timeout(timeout, rx.recv()).await
3767        {
3768            assert!(resp.start.is_none());
3769            assert!(resp.end.is_some());
3770            assert!(resp.ts_init > 0);
3771        }
3772    }
3773
3774    #[tokio::test]
3775    async fn test_request_instruments_client_id_fallback() {
3776        // Test client_id fallback to default when not provided
3777        setup_test_env();
3778
3779        let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3780        let config = DydxDataClientConfig::default();
3781        let http_client = DydxHttpClient::default();
3782        let client =
3783            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3784
3785        let request = RequestInstruments::new(
3786            None,
3787            None,
3788            None, // No client_id provided
3789            Some(*DYDX_VENUE),
3790            UUID4::new(),
3791            get_atomic_clock_realtime().get_time_ns(),
3792            None,
3793        );
3794
3795        // Should use client's default client_id
3796        assert!(client.request_instruments(&request).is_ok());
3797    }
3798
3799    #[tokio::test]
3800    async fn test_request_instruments_with_params() {
3801        // Test custom params handling
3802        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3803        set_data_event_sender(sender);
3804
3805        let client_id = ClientId::from("DYDX-PARAMS-TEST");
3806        let config = DydxDataClientConfig::default();
3807        let http_client = DydxHttpClient::default();
3808        let client =
3809            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3810
3811        // Create params - just verify they're passed through
3812        let mut params_map = IndexMap::new();
3813        params_map.insert("test_key".to_string(), "test_value".to_string());
3814
3815        let request = RequestInstruments::new(
3816            None,
3817            None,
3818            Some(client_id),
3819            Some(*DYDX_VENUE),
3820            UUID4::new(),
3821            get_atomic_clock_realtime().get_time_ns(),
3822            Some(params_map),
3823        );
3824
3825        assert!(client.request_instruments(&request).is_ok());
3826
3827        // Wait for response
3828        let timeout = tokio::time::Duration::from_secs(3);
3829        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3830            tokio::time::timeout(timeout, rx.recv()).await
3831        {
3832            // Verify params are propagated into the response
3833            assert_eq!(resp.client_id, client_id);
3834            let params = resp
3835                .params
3836                .expect("expected params to be present in InstrumentsResponse");
3837            assert_eq!(
3838                params.get("test_key").map(String::as_str),
3839                Some("test_value")
3840            );
3841        }
3842    }
3843
3844    #[tokio::test]
3845    async fn test_request_instruments_with_start_and_end_range() {
3846        // Test timestamp handling when both start and end are provided
3847        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3848        set_data_event_sender(sender);
3849
3850        let client_id = ClientId::from("DYDX-START-END-RANGE");
3851        let config = DydxDataClientConfig::default();
3852        let http_client = DydxHttpClient::default();
3853        let client =
3854            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3855
3856        let now = Utc::now();
3857        let start = Some(now - chrono::Duration::hours(48));
3858        let end = Some(now - chrono::Duration::hours(24));
3859
3860        let request = RequestInstruments::new(
3861            start,
3862            end,
3863            Some(client_id),
3864            Some(*DYDX_VENUE),
3865            UUID4::new(),
3866            get_atomic_clock_realtime().get_time_ns(),
3867            None,
3868        );
3869
3870        assert!(client.request_instruments(&request).is_ok());
3871
3872        let timeout = tokio::time::Duration::from_secs(3);
3873        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3874            tokio::time::timeout(timeout, rx.recv()).await
3875        {
3876            // Verify both timestamps are present
3877            assert!(
3878                resp.start.is_some(),
3879                "start timestamp should be present when provided"
3880            );
3881            assert!(
3882                resp.end.is_some(),
3883                "end timestamp should be present when provided"
3884            );
3885            assert!(resp.ts_init > 0, "ts_init should always be set");
3886
3887            // Verify start is before end
3888            if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3889                assert!(
3890                    start_ts < end_ts,
3891                    "start timestamp should be before end timestamp"
3892                );
3893            }
3894        }
3895    }
3896
3897    #[tokio::test]
3898    async fn test_request_instruments_different_client_ids() {
3899        // Test that different client_id values are properly handled using a shared channel.
3900        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3901        set_data_event_sender(sender);
3902
3903        let timeout = tokio::time::Duration::from_secs(3);
3904
3905        // First client
3906        let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3907        let config1 = DydxDataClientConfig::default();
3908        let http_client1 = DydxHttpClient::default();
3909        let client1 =
3910            DydxDataClient::new(client_id_1, config1, http_client1, create_test_ws_client())
3911                .unwrap();
3912
3913        let request1 = RequestInstruments::new(
3914            None,
3915            None,
3916            Some(client_id_1),
3917            Some(*DYDX_VENUE),
3918            UUID4::new(),
3919            get_atomic_clock_realtime().get_time_ns(),
3920            None,
3921        );
3922
3923        assert!(client1.request_instruments(&request1).is_ok());
3924
3925        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3926            tokio::time::timeout(timeout, rx.recv()).await
3927        {
3928            assert_eq!(
3929                resp1.client_id, client_id_1,
3930                "Response should contain client_id_1"
3931            );
3932        }
3933
3934        // Second client
3935        let client_id_2 = ClientId::from("DYDX-CLIENT-2");
3936        let config2 = DydxDataClientConfig::default();
3937        let http_client2 = DydxHttpClient::default();
3938        let client2 =
3939            DydxDataClient::new(client_id_2, config2, http_client2, create_test_ws_client())
3940                .unwrap();
3941
3942        let request2 = RequestInstruments::new(
3943            None,
3944            None,
3945            Some(client_id_2),
3946            Some(*DYDX_VENUE),
3947            UUID4::new(),
3948            get_atomic_clock_realtime().get_time_ns(),
3949            None,
3950        );
3951
3952        assert!(client2.request_instruments(&request2).is_ok());
3953
3954        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
3955            tokio::time::timeout(timeout, rx.recv()).await
3956        {
3957            assert_eq!(
3958                resp2.client_id, client_id_2,
3959                "Response should contain client_id_2"
3960            );
3961            assert_ne!(
3962                resp2.client_id, client_id_1,
3963                "Different clients should have different client_ids"
3964            );
3965        }
3966    }
3967
3968    #[tokio::test]
3969    async fn test_request_instruments_no_timestamps() {
3970        // Test fetching all current instruments (no start/end filters)
3971        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3972        set_data_event_sender(sender);
3973
3974        let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
3975        let config = DydxDataClientConfig::default();
3976        let http_client = DydxHttpClient::default();
3977        let client =
3978            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3979
3980        let request = RequestInstruments::new(
3981            None, // No start filter
3982            None, // No end filter
3983            Some(client_id),
3984            Some(*DYDX_VENUE),
3985            UUID4::new(),
3986            get_atomic_clock_realtime().get_time_ns(),
3987            None,
3988        );
3989
3990        assert!(client.request_instruments(&request).is_ok());
3991
3992        let timeout = tokio::time::Duration::from_secs(5);
3993        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3994            tokio::time::timeout(timeout, rx.recv()).await
3995        {
3996            // Verify no timestamp filters
3997            assert!(
3998                resp.start.is_none(),
3999                "start should be None when not provided"
4000            );
4001            assert!(resp.end.is_none(), "end should be None when not provided");
4002
4003            // Should still get current instruments
4004            assert_eq!(resp.venue, *DYDX_VENUE);
4005            assert_eq!(resp.client_id, client_id);
4006            assert!(resp.ts_init > 0);
4007        }
4008    }
4009
4010    #[tokio::test]
4011    async fn test_request_instrument_cache_hit() {
4012        // Test cache hit (instrument already cached)
4013        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4014        set_data_event_sender(sender);
4015
4016        let client_id = ClientId::from("DYDX-CACHE-HIT");
4017        let config = DydxDataClientConfig::default();
4018        let http_client = DydxHttpClient::default();
4019        let client =
4020            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4021
4022        // Pre-populate cache with test instrument
4023        let instrument = create_test_instrument_any();
4024        let instrument_id = instrument.id();
4025        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4026        client.instruments.insert(symbol_key, instrument.clone());
4027
4028        let request = RequestInstrument::new(
4029            instrument_id,
4030            None,
4031            None,
4032            Some(client_id),
4033            UUID4::new(),
4034            get_atomic_clock_realtime().get_time_ns(),
4035            None,
4036        );
4037
4038        assert!(client.request_instrument(&request).is_ok());
4039
4040        // Should get immediate response from cache
4041        let timeout = tokio::time::Duration::from_millis(500);
4042        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4043            tokio::time::timeout(timeout, rx.recv()).await
4044        {
4045            assert_eq!(resp.instrument_id, instrument_id);
4046            assert_eq!(resp.client_id, client_id);
4047            assert_eq!(resp.data.id(), instrument_id);
4048        }
4049    }
4050
4051    #[tokio::test]
4052    async fn test_request_instrument_cache_miss() {
4053        // Test cache miss (fetch from API)
4054        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4055        set_data_event_sender(sender);
4056
4057        let client_id = ClientId::from("DYDX-CACHE-MISS");
4058        let config = DydxDataClientConfig::default();
4059        let http_client = DydxHttpClient::default();
4060        let client =
4061            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4062
4063        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4064
4065        let request = RequestInstrument::new(
4066            instrument_id,
4067            None,
4068            None,
4069            Some(client_id),
4070            UUID4::new(),
4071            get_atomic_clock_realtime().get_time_ns(),
4072            None,
4073        );
4074
4075        assert!(client.request_instrument(&request).is_ok());
4076
4077        // Wait for async HTTP fetch and response
4078        let timeout = tokio::time::Duration::from_secs(5);
4079        let result = tokio::time::timeout(timeout, rx.recv()).await;
4080
4081        // May timeout if testnet unreachable, but should not panic
4082        match result {
4083            Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4084                assert_eq!(resp.instrument_id, instrument_id);
4085                assert_eq!(resp.client_id, client_id);
4086            }
4087            Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4088            Ok(None) => panic!("Channel closed unexpectedly"),
4089            Err(_) => {
4090                println!("Test timed out - testnet may be unreachable");
4091            }
4092        }
4093    }
4094
4095    #[tokio::test]
4096    async fn test_request_instrument_not_found() {
4097        // Test instrument not found scenario
4098        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4099        set_data_event_sender(sender);
4100
4101        let client_id = ClientId::from("DYDX-NOT-FOUND");
4102        let config = DydxDataClientConfig {
4103            base_url_http: Some("http://invalid-url.local".to_string()),
4104            ..Default::default()
4105        };
4106        let http_client = DydxHttpClient::new(
4107            config.base_url_http.clone(),
4108            config.http_timeout_secs,
4109            config.http_proxy_url.clone(),
4110            config.is_testnet,
4111            None,
4112        )
4113        .unwrap();
4114
4115        let client =
4116            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4117
4118        let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4119
4120        let request = RequestInstrument::new(
4121            instrument_id,
4122            None,
4123            None,
4124            Some(client_id),
4125            UUID4::new(),
4126            get_atomic_clock_realtime().get_time_ns(),
4127            None,
4128        );
4129
4130        // Should not panic on invalid instrument
4131        assert!(client.request_instrument(&request).is_ok());
4132
4133        // Note: No response sent when instrument not found (by design)
4134        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4135    }
4136
4137    #[tokio::test]
4138    async fn test_request_instrument_bulk_caching() {
4139        // Test bulk caching when fetching from API
4140        setup_test_env();
4141
4142        let client_id = ClientId::from("DYDX-BULK-CACHE");
4143        let config = DydxDataClientConfig::default();
4144        let http_client = DydxHttpClient::default();
4145        let client =
4146            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4147
4148        let initial_cache_size = client.instruments.len();
4149
4150        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4151
4152        let request = RequestInstrument::new(
4153            instrument_id,
4154            None,
4155            None,
4156            Some(client_id),
4157            UUID4::new(),
4158            get_atomic_clock_realtime().get_time_ns(),
4159            None,
4160        );
4161
4162        assert!(client.request_instrument(&request).is_ok());
4163
4164        // Wait for async bulk fetch
4165        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4166
4167        // Verify cache populated with all instruments (if HTTP succeeded)
4168        let final_cache_size = client.instruments.len();
4169        assert!(final_cache_size >= initial_cache_size);
4170        // If HTTP succeeded, cache should have multiple instruments
4171    }
4172
4173    #[tokio::test]
4174    async fn test_request_instrument_correlation_id() {
4175        // Test correlation_id matching
4176        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4177        set_data_event_sender(sender);
4178
4179        let client_id = ClientId::from("DYDX-CORR-ID");
4180        let config = DydxDataClientConfig::default();
4181        let http_client = DydxHttpClient::default();
4182        let client =
4183            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4184
4185        // Pre-populate cache to get immediate response
4186        let instrument = create_test_instrument_any();
4187        let instrument_id = instrument.id();
4188        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4189        client.instruments.insert(symbol_key, instrument.clone());
4190
4191        let request_id = UUID4::new();
4192        let request = RequestInstrument::new(
4193            instrument_id,
4194            None,
4195            None,
4196            Some(client_id),
4197            request_id,
4198            get_atomic_clock_realtime().get_time_ns(),
4199            None,
4200        );
4201
4202        assert!(client.request_instrument(&request).is_ok());
4203
4204        // Verify correlation_id matches
4205        let timeout = tokio::time::Duration::from_millis(500);
4206        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4207            tokio::time::timeout(timeout, rx.recv()).await
4208        {
4209            assert_eq!(resp.correlation_id, request_id);
4210        }
4211    }
4212
4213    #[tokio::test]
4214    async fn test_request_instrument_response_format_boxed() {
4215        // Verify InstrumentResponse format (boxed)
4216        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4217        set_data_event_sender(sender);
4218
4219        let client_id = ClientId::from("DYDX-BOXED");
4220        let config = DydxDataClientConfig::default();
4221        let http_client = DydxHttpClient::default();
4222        let client =
4223            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4224
4225        // Pre-populate cache
4226        let instrument = create_test_instrument_any();
4227        let instrument_id = instrument.id();
4228        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4229        client.instruments.insert(symbol_key, instrument.clone());
4230
4231        let request = RequestInstrument::new(
4232            instrument_id,
4233            None,
4234            None,
4235            Some(client_id),
4236            UUID4::new(),
4237            get_atomic_clock_realtime().get_time_ns(),
4238            None,
4239        );
4240
4241        assert!(client.request_instrument(&request).is_ok());
4242
4243        // Verify response is properly boxed
4244        let timeout = tokio::time::Duration::from_millis(500);
4245        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4246            tokio::time::timeout(timeout, rx.recv()).await
4247        {
4248            // Verify boxed response structure
4249            assert_eq!(boxed_resp.instrument_id, instrument_id);
4250            assert_eq!(boxed_resp.client_id, client_id);
4251            assert!(boxed_resp.start.is_none());
4252            assert!(boxed_resp.end.is_none());
4253            assert!(boxed_resp.ts_init > 0);
4254        }
4255    }
4256
4257    #[rstest]
4258    fn test_request_instrument_symbol_extraction() {
4259        // Test symbol extraction from InstrumentId
4260        setup_test_env();
4261
4262        let client_id = ClientId::from("DYDX-SYMBOL");
4263        let config = DydxDataClientConfig::default();
4264        let http_client = DydxHttpClient::default();
4265        let _client =
4266            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4267
4268        // Test various instrument ID formats
4269        // Note: Symbol includes the -PERP suffix in dYdX
4270        let test_cases = vec![
4271            ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4272            ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4273            ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4274        ];
4275
4276        for (instrument_id_str, expected_symbol) in test_cases {
4277            let instrument_id = InstrumentId::from(instrument_id_str);
4278            let symbol = Ustr::from(instrument_id.symbol.as_str());
4279            assert_eq!(symbol.as_str(), expected_symbol);
4280        }
4281    }
4282
4283    #[tokio::test]
4284    async fn test_request_instrument_client_id_fallback() {
4285        // Test client_id fallback to default
4286        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4287        set_data_event_sender(sender);
4288
4289        let client_id = ClientId::from("DYDX-FALLBACK");
4290        let config = DydxDataClientConfig::default();
4291        let http_client = DydxHttpClient::default();
4292        let client =
4293            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4294
4295        // Pre-populate cache
4296        let instrument = create_test_instrument_any();
4297        let instrument_id = instrument.id();
4298        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4299        client.instruments.insert(symbol_key, instrument.clone());
4300
4301        let request = RequestInstrument::new(
4302            instrument_id,
4303            None,
4304            None,
4305            None, // No client_id provided
4306            UUID4::new(),
4307            get_atomic_clock_realtime().get_time_ns(),
4308            None,
4309        );
4310
4311        assert!(client.request_instrument(&request).is_ok());
4312
4313        // Should use client's default client_id
4314        let timeout = tokio::time::Duration::from_millis(500);
4315        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4316            tokio::time::timeout(timeout, rx.recv()).await
4317        {
4318            assert_eq!(resp.client_id, client_id);
4319        }
4320    }
4321
4322    #[tokio::test]
4323    async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4324        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4325        set_data_event_sender(sender);
4326
4327        let created_at = Utc::now();
4328
4329        let http_trade = crate::http::models::Trade {
4330            id: "trade-1".to_string(),
4331            side: OrderSide::Buy,
4332            size: dec!(1.5),
4333            price: dec!(100.25),
4334            created_at,
4335            created_at_height: 1,
4336            trade_type: crate::common::enums::DydxTradeType::Limit,
4337        };
4338
4339        let trades_response = crate::http::models::TradesResponse {
4340            trades: vec![http_trade],
4341        };
4342
4343        let state = TradesTestState {
4344            response: Arc::new(trades_response),
4345            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4346            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4347        };
4348
4349        let addr = start_trades_test_server(state.clone()).await;
4350        let base_url = format!("http://{addr}");
4351
4352        let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4353        let config = DydxDataClientConfig {
4354            base_url_http: Some(base_url),
4355            is_testnet: true,
4356            ..Default::default()
4357        };
4358
4359        let http_client = DydxHttpClient::new(
4360            config.base_url_http.clone(),
4361            config.http_timeout_secs,
4362            config.http_proxy_url.clone(),
4363            config.is_testnet,
4364            None,
4365        )
4366        .unwrap();
4367
4368        let client =
4369            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4370
4371        let instrument = create_test_instrument_any();
4372        let instrument_id = instrument.id();
4373        let price_precision = instrument.price_precision();
4374        let size_precision = instrument.size_precision();
4375        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4376        client.instruments.insert(symbol_key, instrument);
4377
4378        let request_id = UUID4::new();
4379        let now = Utc::now();
4380        let start = Some(now - chrono::Duration::seconds(10));
4381        let end = Some(now + chrono::Duration::seconds(10));
4382        let limit = std::num::NonZeroUsize::new(100).unwrap();
4383
4384        let request = RequestTrades::new(
4385            instrument_id,
4386            start,
4387            end,
4388            Some(limit),
4389            Some(client_id),
4390            request_id,
4391            get_atomic_clock_realtime().get_time_ns(),
4392            None,
4393        );
4394
4395        assert!(client.request_trades(&request).is_ok());
4396
4397        let timeout = tokio::time::Duration::from_secs(1);
4398        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4399            tokio::time::timeout(timeout, rx.recv()).await
4400        {
4401            assert_eq!(resp.correlation_id, request_id);
4402            assert_eq!(resp.client_id, client_id);
4403            assert_eq!(resp.instrument_id, instrument_id);
4404            assert_eq!(resp.data.len(), 1);
4405
4406            let tick = &resp.data[0];
4407            assert_eq!(tick.instrument_id, instrument_id);
4408            assert_eq!(tick.price, Price::new(100.25, price_precision));
4409            assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4410            assert_eq!(tick.trade_id.to_string(), "trade-1");
4411            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4412        } else {
4413            panic!("did not receive trades response in time");
4414        }
4415
4416        // Verify symbol conversion (strip -PERP suffix) and limit propagation.
4417        let last_ticker = state.last_ticker.lock().await.clone();
4418        assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4419
4420        let last_limit = *state.last_limit.lock().await;
4421        assert_eq!(last_limit, Some(Some(100)));
4422    }
4423
4424    #[tokio::test]
4425    async fn test_request_trades_empty_response_and_no_limit() {
4426        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4427        set_data_event_sender(sender);
4428
4429        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4430
4431        let state = TradesTestState {
4432            response: Arc::new(trades_response),
4433            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4434            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4435        };
4436
4437        let addr = start_trades_test_server(state.clone()).await;
4438        let base_url = format!("http://{addr}");
4439
4440        let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4441        let config = DydxDataClientConfig {
4442            base_url_http: Some(base_url),
4443            is_testnet: true,
4444            ..Default::default()
4445        };
4446
4447        let http_client = DydxHttpClient::new(
4448            config.base_url_http.clone(),
4449            config.http_timeout_secs,
4450            config.http_proxy_url.clone(),
4451            config.is_testnet,
4452            None,
4453        )
4454        .unwrap();
4455
4456        let client =
4457            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4458
4459        let instrument = create_test_instrument_any();
4460        let instrument_id = instrument.id();
4461        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4462        client.instruments.insert(symbol_key, instrument);
4463
4464        let request_id = UUID4::new();
4465
4466        let request = RequestTrades::new(
4467            instrument_id,
4468            None,
4469            None,
4470            None, // No limit
4471            Some(client_id),
4472            request_id,
4473            get_atomic_clock_realtime().get_time_ns(),
4474            None,
4475        );
4476
4477        assert!(client.request_trades(&request).is_ok());
4478
4479        let timeout = tokio::time::Duration::from_secs(1);
4480        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4481            tokio::time::timeout(timeout, rx.recv()).await
4482        {
4483            assert_eq!(resp.correlation_id, request_id);
4484            assert_eq!(resp.client_id, client_id);
4485            assert_eq!(resp.instrument_id, instrument_id);
4486            assert!(resp.data.is_empty());
4487        } else {
4488            panic!("did not receive trades response in time");
4489        }
4490
4491        // Verify that no `limit` query parameter was sent.
4492        let last_limit = *state.last_limit.lock().await;
4493        assert_eq!(last_limit, Some(None));
4494    }
4495
4496    #[tokio::test]
4497    async fn test_request_trades_timestamp_filtering() {
4498        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4499        set_data_event_sender(sender);
4500
4501        let now = Utc::now();
4502        let trade_before = crate::http::models::Trade {
4503            id: "before".to_string(),
4504            side: OrderSide::Buy,
4505            size: dec!(1.0),
4506            price: dec!(100.0),
4507            created_at: now - chrono::Duration::seconds(60),
4508            created_at_height: 1,
4509            trade_type: crate::common::enums::DydxTradeType::Limit,
4510        };
4511        let trade_inside = crate::http::models::Trade {
4512            id: "inside".to_string(),
4513            side: OrderSide::Sell,
4514            size: dec!(2.0),
4515            price: dec!(101.0),
4516            created_at: now,
4517            created_at_height: 2,
4518            trade_type: crate::common::enums::DydxTradeType::Limit,
4519        };
4520        let trade_after = crate::http::models::Trade {
4521            id: "after".to_string(),
4522            side: OrderSide::Buy,
4523            size: dec!(3.0),
4524            price: dec!(102.0),
4525            created_at: now + chrono::Duration::seconds(60),
4526            created_at_height: 3,
4527            trade_type: crate::common::enums::DydxTradeType::Limit,
4528        };
4529
4530        let trades_response = crate::http::models::TradesResponse {
4531            trades: vec![trade_before, trade_inside.clone(), trade_after],
4532        };
4533
4534        let state = TradesTestState {
4535            response: Arc::new(trades_response),
4536            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4537            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4538        };
4539
4540        let addr = start_trades_test_server(state).await;
4541        let base_url = format!("http://{addr}");
4542
4543        let client_id = ClientId::from("DYDX-TRADES-FILTER");
4544        let config = DydxDataClientConfig {
4545            base_url_http: Some(base_url),
4546            is_testnet: true,
4547            ..Default::default()
4548        };
4549
4550        let http_client = DydxHttpClient::new(
4551            config.base_url_http.clone(),
4552            config.http_timeout_secs,
4553            config.http_proxy_url.clone(),
4554            config.is_testnet,
4555            None,
4556        )
4557        .unwrap();
4558
4559        let client =
4560            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4561
4562        let instrument = create_test_instrument_any();
4563        let instrument_id = instrument.id();
4564        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4565        client.instruments.insert(symbol_key, instrument);
4566
4567        let request_id = UUID4::new();
4568
4569        // Filter range includes only the "inside" trade.
4570        let start = Some(now - chrono::Duration::seconds(10));
4571        let end = Some(now + chrono::Duration::seconds(10));
4572
4573        let request = RequestTrades::new(
4574            instrument_id,
4575            start,
4576            end,
4577            None,
4578            Some(client_id),
4579            request_id,
4580            get_atomic_clock_realtime().get_time_ns(),
4581            None,
4582        );
4583
4584        assert!(client.request_trades(&request).is_ok());
4585
4586        let timeout = tokio::time::Duration::from_secs(1);
4587        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4588            tokio::time::timeout(timeout, rx.recv()).await
4589        {
4590            assert_eq!(resp.correlation_id, request_id);
4591            assert_eq!(resp.client_id, client_id);
4592            assert_eq!(resp.instrument_id, instrument_id);
4593            assert_eq!(resp.data.len(), 1);
4594
4595            let tick = &resp.data[0];
4596            assert_eq!(tick.trade_id.to_string(), "inside");
4597            assert_eq!(tick.price.as_decimal(), dec!(101.0));
4598        } else {
4599            panic!("did not receive trades response in time");
4600        }
4601    }
4602
4603    #[tokio::test]
4604    async fn test_request_trades_correlation_id_matching() {
4605        // Test correlation_id matching in response
4606        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4607        set_data_event_sender(sender);
4608
4609        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4610
4611        let state = TradesTestState {
4612            response: Arc::new(trades_response),
4613            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4614            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4615        };
4616
4617        let addr = start_trades_test_server(state).await;
4618        let base_url = format!("http://{addr}");
4619
4620        let client_id = ClientId::from("DYDX-TRADES-CORR");
4621        let config = DydxDataClientConfig {
4622            base_url_http: Some(base_url),
4623            is_testnet: true,
4624            ..Default::default()
4625        };
4626
4627        let http_client = DydxHttpClient::new(
4628            config.base_url_http.clone(),
4629            config.http_timeout_secs,
4630            config.http_proxy_url.clone(),
4631            config.is_testnet,
4632            None,
4633        )
4634        .unwrap();
4635
4636        let client =
4637            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4638
4639        let instrument = create_test_instrument_any();
4640        let instrument_id = instrument.id();
4641        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4642        client.instruments.insert(symbol_key, instrument);
4643
4644        let request_id = UUID4::new();
4645        let request = RequestTrades::new(
4646            instrument_id,
4647            None,
4648            None,
4649            None,
4650            Some(client_id),
4651            request_id,
4652            get_atomic_clock_realtime().get_time_ns(),
4653            None,
4654        );
4655
4656        assert!(client.request_trades(&request).is_ok());
4657
4658        let timeout = tokio::time::Duration::from_millis(500);
4659        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4660            tokio::time::timeout(timeout, rx.recv()).await
4661        {
4662            assert_eq!(resp.correlation_id, request_id);
4663        }
4664    }
4665
4666    #[tokio::test]
4667    async fn test_request_trades_response_format() {
4668        // Verify TradesResponse format
4669        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4670        set_data_event_sender(sender);
4671
4672        let created_at = Utc::now();
4673        let http_trade = crate::http::models::Trade {
4674            id: "format-test".to_string(),
4675            side: OrderSide::Sell,
4676            size: dec!(5.0),
4677            price: dec!(200.0),
4678            created_at,
4679            created_at_height: 100,
4680            trade_type: crate::common::enums::DydxTradeType::Limit,
4681        };
4682
4683        let trades_response = crate::http::models::TradesResponse {
4684            trades: vec![http_trade],
4685        };
4686
4687        let state = TradesTestState {
4688            response: Arc::new(trades_response),
4689            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4690            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4691        };
4692
4693        let addr = start_trades_test_server(state).await;
4694        let base_url = format!("http://{addr}");
4695
4696        let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4697        let config = DydxDataClientConfig {
4698            base_url_http: Some(base_url),
4699            is_testnet: true,
4700            ..Default::default()
4701        };
4702
4703        let http_client = DydxHttpClient::new(
4704            config.base_url_http.clone(),
4705            config.http_timeout_secs,
4706            config.http_proxy_url.clone(),
4707            config.is_testnet,
4708            None,
4709        )
4710        .unwrap();
4711
4712        let client =
4713            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4714
4715        let instrument = create_test_instrument_any();
4716        let instrument_id = instrument.id();
4717        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4718        client.instruments.insert(symbol_key, instrument);
4719
4720        let request = RequestTrades::new(
4721            instrument_id,
4722            None,
4723            None,
4724            None,
4725            Some(client_id),
4726            UUID4::new(),
4727            get_atomic_clock_realtime().get_time_ns(),
4728            None,
4729        );
4730
4731        assert!(client.request_trades(&request).is_ok());
4732
4733        let timeout = tokio::time::Duration::from_millis(500);
4734        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4735            tokio::time::timeout(timeout, rx.recv()).await
4736        {
4737            // Verify response structure
4738            assert_eq!(resp.client_id, client_id);
4739            assert_eq!(resp.instrument_id, instrument_id);
4740            assert!(resp.data.len() == 1);
4741            assert!(resp.ts_init > 0);
4742
4743            // Verify trade tick structure
4744            let tick = &resp.data[0];
4745            assert_eq!(tick.instrument_id, instrument_id);
4746            assert!(tick.ts_event > 0);
4747            assert!(tick.ts_init > 0);
4748        }
4749    }
4750
4751    #[tokio::test]
4752    async fn test_request_trades_no_instrument_in_cache() {
4753        // Test empty response when instrument not in cache
4754        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4755        set_data_event_sender(sender);
4756
4757        let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4758        let config = DydxDataClientConfig::default();
4759        let http_client = DydxHttpClient::default();
4760        let client =
4761            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4762
4763        // Don't add instrument to cache
4764        let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4765
4766        let request = RequestTrades::new(
4767            instrument_id,
4768            None,
4769            None,
4770            None,
4771            Some(client_id),
4772            UUID4::new(),
4773            get_atomic_clock_realtime().get_time_ns(),
4774            None,
4775        );
4776
4777        assert!(client.request_trades(&request).is_ok());
4778
4779        // Should receive empty response when instrument not found
4780        let timeout = tokio::time::Duration::from_millis(500);
4781        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4782            tokio::time::timeout(timeout, rx.recv()).await
4783        {
4784            assert!(resp.data.is_empty());
4785        }
4786    }
4787
4788    #[tokio::test]
4789    async fn test_request_trades_limit_parameter() {
4790        // Test limit parameter handling
4791        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4792        set_data_event_sender(sender);
4793
4794        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4795
4796        let state = TradesTestState {
4797            response: Arc::new(trades_response),
4798            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4799            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4800        };
4801
4802        let addr = start_trades_test_server(state.clone()).await;
4803        let base_url = format!("http://{addr}");
4804
4805        let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4806        let config = DydxDataClientConfig {
4807            base_url_http: Some(base_url),
4808            is_testnet: true,
4809            ..Default::default()
4810        };
4811
4812        let http_client = DydxHttpClient::new(
4813            config.base_url_http.clone(),
4814            config.http_timeout_secs,
4815            config.http_proxy_url.clone(),
4816            config.is_testnet,
4817            None,
4818        )
4819        .unwrap();
4820
4821        let client =
4822            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4823
4824        let instrument = create_test_instrument_any();
4825        let instrument_id = instrument.id();
4826        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4827        client.instruments.insert(symbol_key, instrument);
4828
4829        // Test with limit
4830        let limit = std::num::NonZeroUsize::new(500).unwrap();
4831        let request = RequestTrades::new(
4832            instrument_id,
4833            None,
4834            None,
4835            Some(limit),
4836            Some(client_id),
4837            UUID4::new(),
4838            get_atomic_clock_realtime().get_time_ns(),
4839            None,
4840        );
4841
4842        assert!(client.request_trades(&request).is_ok());
4843
4844        let state_clone = state.clone();
4845        wait_until_async(
4846            || async { state_clone.last_limit.lock().await.is_some() },
4847            Duration::from_secs(5),
4848        )
4849        .await;
4850
4851        // Verify limit was passed to HTTP client
4852        let last_limit = *state.last_limit.lock().await;
4853        assert_eq!(last_limit, Some(Some(500)));
4854    }
4855
4856    #[rstest]
4857    fn test_request_trades_symbol_conversion() {
4858        // Test symbol conversion (strip -PERP suffix)
4859        setup_test_env();
4860
4861        let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4862        let config = DydxDataClientConfig::default();
4863        let http_client = DydxHttpClient::default();
4864        let _client =
4865            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4866
4867        // Verify symbol format for various instruments
4868        let test_cases = vec![
4869            ("BTC-USD-PERP.DYDX", "BTC-USD"),
4870            ("ETH-USD-PERP.DYDX", "ETH-USD"),
4871            ("SOL-USD-PERP.DYDX", "SOL-USD"),
4872        ];
4873
4874        for (instrument_id_str, expected_ticker) in test_cases {
4875            let instrument_id = InstrumentId::from(instrument_id_str);
4876            let ticker = instrument_id
4877                .symbol
4878                .as_str()
4879                .trim_end_matches("-PERP")
4880                .to_string();
4881            assert_eq!(ticker, expected_ticker);
4882        }
4883    }
4884
4885    #[tokio::test]
4886    async fn test_http_404_handling() {
4887        // Test HTTP 404 handling (instrument not found)
4888        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4889        set_data_event_sender(sender);
4890
4891        let client_id = ClientId::from("DYDX-404");
4892        let config = DydxDataClientConfig {
4893            base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4894            http_timeout_secs: Some(1),
4895            ..Default::default()
4896        };
4897
4898        let http_client = DydxHttpClient::new(
4899            config.base_url_http.clone(),
4900            config.http_timeout_secs,
4901            config.http_proxy_url.clone(),
4902            config.is_testnet,
4903            None,
4904        )
4905        .unwrap();
4906
4907        let client =
4908            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4909
4910        let request = RequestInstruments::new(
4911            None,
4912            None,
4913            Some(client_id),
4914            Some(*DYDX_VENUE),
4915            UUID4::new(),
4916            get_atomic_clock_realtime().get_time_ns(),
4917            None,
4918        );
4919
4920        assert!(client.request_instruments(&request).is_ok());
4921
4922        // Should receive empty response on 404
4923        let timeout = tokio::time::Duration::from_secs(2);
4924        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4925            tokio::time::timeout(timeout, rx.recv()).await
4926        {
4927            assert!(resp.data.is_empty(), "Expected empty response on 404");
4928        }
4929    }
4930
4931    #[tokio::test]
4932    async fn test_http_500_handling() {
4933        // Test HTTP 500 handling (internal server error)
4934        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4935        set_data_event_sender(sender);
4936
4937        let client_id = ClientId::from("DYDX-500");
4938        let config = DydxDataClientConfig {
4939            base_url_http: Some("http://httpstat.us/500".to_string()),
4940            http_timeout_secs: Some(2),
4941            ..Default::default()
4942        };
4943
4944        let http_client = DydxHttpClient::new(
4945            config.base_url_http.clone(),
4946            config.http_timeout_secs,
4947            config.http_proxy_url.clone(),
4948            config.is_testnet,
4949            None,
4950        )
4951        .unwrap();
4952
4953        let client =
4954            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4955
4956        let instrument = create_test_instrument_any();
4957        let instrument_id = instrument.id();
4958        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4959        client.instruments.insert(symbol_key, instrument);
4960
4961        let request = RequestTrades::new(
4962            instrument_id,
4963            None,
4964            None,
4965            None,
4966            Some(client_id),
4967            UUID4::new(),
4968            get_atomic_clock_realtime().get_time_ns(),
4969            None,
4970        );
4971
4972        assert!(client.request_trades(&request).is_ok());
4973
4974        // Should receive empty response on 500 error
4975        let timeout = tokio::time::Duration::from_secs(3);
4976        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4977            tokio::time::timeout(timeout, rx.recv()).await
4978        {
4979            assert!(resp.data.is_empty(), "Expected empty response on 500");
4980        }
4981    }
4982
4983    #[tokio::test]
4984    async fn test_network_timeout_handling() {
4985        // Test network timeout scenarios
4986        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4987        set_data_event_sender(sender);
4988
4989        let client_id = ClientId::from("DYDX-TIMEOUT");
4990        let config = DydxDataClientConfig {
4991            base_url_http: Some("http://10.255.255.1:81".to_string()), // Non-routable IP
4992            http_timeout_secs: Some(1),                                // Very short timeout
4993            ..Default::default()
4994        };
4995
4996        let http_client = DydxHttpClient::new(
4997            config.base_url_http.clone(),
4998            config.http_timeout_secs,
4999            config.http_proxy_url.clone(),
5000            config.is_testnet,
5001            None,
5002        )
5003        .unwrap();
5004
5005        let client =
5006            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5007
5008        let request = RequestInstruments::new(
5009            None,
5010            None,
5011            Some(client_id),
5012            Some(*DYDX_VENUE),
5013            UUID4::new(),
5014            get_atomic_clock_realtime().get_time_ns(),
5015            None,
5016        );
5017
5018        assert!(client.request_instruments(&request).is_ok());
5019
5020        // Should timeout and return empty response
5021        let timeout = tokio::time::Duration::from_secs(3);
5022        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5023            tokio::time::timeout(timeout, rx.recv()).await
5024        {
5025            assert!(resp.data.is_empty(), "Expected empty response on timeout");
5026        }
5027    }
5028
5029    #[tokio::test]
5030    async fn test_connection_refused_handling() {
5031        // Test connection refused errors
5032        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5033        set_data_event_sender(sender);
5034
5035        let client_id = ClientId::from("DYDX-REFUSED");
5036        let config = DydxDataClientConfig {
5037            base_url_http: Some("http://localhost:9999".to_string()), // Port unlikely to be open
5038            http_timeout_secs: Some(1),
5039            ..Default::default()
5040        };
5041
5042        let http_client = DydxHttpClient::new(
5043            config.base_url_http.clone(),
5044            config.http_timeout_secs,
5045            config.http_proxy_url.clone(),
5046            config.is_testnet,
5047            None,
5048        )
5049        .unwrap();
5050
5051        let client =
5052            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5053
5054        let instrument = create_test_instrument_any();
5055        let instrument_id = instrument.id();
5056        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5057        client.instruments.insert(symbol_key, instrument);
5058
5059        let request = RequestInstrument::new(
5060            instrument_id,
5061            None,
5062            None,
5063            Some(client_id),
5064            UUID4::new(),
5065            get_atomic_clock_realtime().get_time_ns(),
5066            None,
5067        );
5068
5069        assert!(client.request_instrument(&request).is_ok());
5070
5071        // Should handle connection refused gracefully
5072        let timeout = tokio::time::Duration::from_secs(2);
5073        let result = tokio::time::timeout(timeout, rx.recv()).await;
5074
5075        // May not receive response if connection fails before spawning handler
5076        // This is acceptable - the important part is no panic
5077        match result {
5078            Ok(Some(DataEvent::Response(_))) => {
5079                // Response received (empty data expected)
5080            }
5081            Ok(None) | Err(_) => {
5082                // No response or timeout - acceptable for connection refused
5083            }
5084            _ => {}
5085        }
5086    }
5087
5088    #[tokio::test]
5089    async fn test_dns_resolution_failure_handling() {
5090        // Test DNS resolution failures
5091        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5092        set_data_event_sender(sender);
5093
5094        let client_id = ClientId::from("DYDX-DNS");
5095        let config = DydxDataClientConfig {
5096            base_url_http: Some(
5097                "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5098            ),
5099            http_timeout_secs: Some(2),
5100            ..Default::default()
5101        };
5102
5103        let http_client = DydxHttpClient::new(
5104            config.base_url_http.clone(),
5105            config.http_timeout_secs,
5106            config.http_proxy_url.clone(),
5107            config.is_testnet,
5108            None,
5109        )
5110        .unwrap();
5111
5112        let client =
5113            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5114
5115        let request = RequestInstruments::new(
5116            None,
5117            None,
5118            Some(client_id),
5119            Some(*DYDX_VENUE),
5120            UUID4::new(),
5121            get_atomic_clock_realtime().get_time_ns(),
5122            None,
5123        );
5124
5125        assert!(client.request_instruments(&request).is_ok());
5126
5127        // Should handle DNS failure gracefully with empty response
5128        let timeout = tokio::time::Duration::from_secs(3);
5129        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5130            tokio::time::timeout(timeout, rx.recv()).await
5131        {
5132            assert!(
5133                resp.data.is_empty(),
5134                "Expected empty response on DNS failure"
5135            );
5136        }
5137    }
5138
5139    #[tokio::test]
5140    async fn test_http_502_503_handling() {
5141        // Test HTTP 502/503 handling (bad gateway/service unavailable)
5142        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5143        set_data_event_sender(sender);
5144
5145        let client_id = ClientId::from("DYDX-503");
5146        let config = DydxDataClientConfig {
5147            base_url_http: Some("http://httpstat.us/503".to_string()),
5148            http_timeout_secs: Some(2),
5149            ..Default::default()
5150        };
5151
5152        let http_client = DydxHttpClient::new(
5153            config.base_url_http.clone(),
5154            config.http_timeout_secs,
5155            config.http_proxy_url.clone(),
5156            config.is_testnet,
5157            None,
5158        )
5159        .unwrap();
5160
5161        let client =
5162            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5163
5164        let request = RequestInstruments::new(
5165            None,
5166            None,
5167            Some(client_id),
5168            Some(*DYDX_VENUE),
5169            UUID4::new(),
5170            get_atomic_clock_realtime().get_time_ns(),
5171            None,
5172        );
5173
5174        assert!(client.request_instruments(&request).is_ok());
5175
5176        // Should handle 503 gracefully with empty response
5177        let timeout = tokio::time::Duration::from_secs(3);
5178        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5179            tokio::time::timeout(timeout, rx.recv()).await
5180        {
5181            assert!(resp.data.is_empty(), "Expected empty response on 503");
5182        }
5183    }
5184
5185    #[tokio::test]
5186    async fn test_http_429_rate_limit_handling() {
5187        // Test HTTP 429 handling (rate limit exceeded)
5188        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5189        set_data_event_sender(sender);
5190
5191        let client_id = ClientId::from("DYDX-429");
5192        let config = DydxDataClientConfig {
5193            base_url_http: Some("http://httpstat.us/429".to_string()),
5194            http_timeout_secs: Some(2),
5195            ..Default::default()
5196        };
5197
5198        let http_client = DydxHttpClient::new(
5199            config.base_url_http.clone(),
5200            config.http_timeout_secs,
5201            config.http_proxy_url.clone(),
5202            config.is_testnet,
5203            None,
5204        )
5205        .unwrap();
5206
5207        let client =
5208            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5209
5210        let instrument = create_test_instrument_any();
5211        let instrument_id = instrument.id();
5212        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5213        client.instruments.insert(symbol_key, instrument);
5214
5215        let request = RequestTrades::new(
5216            instrument_id,
5217            None,
5218            None,
5219            None,
5220            Some(client_id),
5221            UUID4::new(),
5222            get_atomic_clock_realtime().get_time_ns(),
5223            None,
5224        );
5225
5226        assert!(client.request_trades(&request).is_ok());
5227
5228        // Should handle rate limit with empty response
5229        let timeout = tokio::time::Duration::from_secs(3);
5230        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5231            tokio::time::timeout(timeout, rx.recv()).await
5232        {
5233            assert!(
5234                resp.data.is_empty(),
5235                "Expected empty response on rate limit"
5236            );
5237        }
5238    }
5239
5240    #[tokio::test]
5241    async fn test_error_handling_does_not_panic() {
5242        // Test that error scenarios don't cause panics
5243        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5244        set_data_event_sender(sender);
5245
5246        let client_id = ClientId::from("DYDX-NO-PANIC");
5247        let config = DydxDataClientConfig {
5248            base_url_http: Some("http://invalid".to_string()),
5249            ..Default::default()
5250        };
5251
5252        let http_client = DydxHttpClient::new(
5253            config.base_url_http.clone(),
5254            config.http_timeout_secs,
5255            config.http_proxy_url.clone(),
5256            config.is_testnet,
5257            None,
5258        )
5259        .unwrap();
5260
5261        let client =
5262            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5263
5264        // All these should return Ok() without panicking
5265        let request_instruments = RequestInstruments::new(
5266            None,
5267            None,
5268            Some(client_id),
5269            Some(*DYDX_VENUE),
5270            UUID4::new(),
5271            get_atomic_clock_realtime().get_time_ns(),
5272            None,
5273        );
5274        assert!(client.request_instruments(&request_instruments).is_ok());
5275
5276        let instrument_id = InstrumentId::from("INVALID.DYDX");
5277        let request_instrument = RequestInstrument::new(
5278            instrument_id,
5279            None,
5280            None,
5281            Some(client_id),
5282            UUID4::new(),
5283            get_atomic_clock_realtime().get_time_ns(),
5284            None,
5285        );
5286        assert!(client.request_instrument(&request_instrument).is_ok());
5287
5288        let request_trades = RequestTrades::new(
5289            instrument_id,
5290            None,
5291            None,
5292            None,
5293            Some(client_id),
5294            UUID4::new(),
5295            get_atomic_clock_realtime().get_time_ns(),
5296            None,
5297        );
5298        assert!(client.request_trades(&request_trades).is_ok());
5299    }
5300
5301    #[tokio::test]
5302    async fn test_malformed_json_response() {
5303        // Test handling of malformed JSON from API
5304        use axum::{Router, routing::get};
5305
5306        #[derive(Clone)]
5307        struct MalformedState;
5308
5309        async fn malformed_markets_handler() -> String {
5310            // Invalid JSON - missing closing brace
5311            r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5312        }
5313
5314        let app = Router::new()
5315            .route("/v4/markets", get(malformed_markets_handler))
5316            .with_state(MalformedState);
5317
5318        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5319        let server_addr = listener.local_addr().unwrap();
5320        let port = server_addr.port();
5321
5322        tokio::spawn(async move {
5323            axum::serve(listener, app).await.unwrap();
5324        });
5325
5326        wait_for_server(server_addr).await;
5327
5328        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5329        set_data_event_sender(sender);
5330
5331        let client_id = ClientId::from("DYDX-MALFORMED");
5332        let config = DydxDataClientConfig {
5333            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5334            http_timeout_secs: Some(2),
5335            ..Default::default()
5336        };
5337
5338        let http_client = DydxHttpClient::new(
5339            config.base_url_http.clone(),
5340            config.http_timeout_secs,
5341            config.http_proxy_url.clone(),
5342            config.is_testnet,
5343            None,
5344        )
5345        .unwrap();
5346
5347        let client =
5348            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5349
5350        let request = RequestInstruments::new(
5351            None,
5352            None,
5353            Some(client_id),
5354            Some(*DYDX_VENUE),
5355            UUID4::new(),
5356            get_atomic_clock_realtime().get_time_ns(),
5357            None,
5358        );
5359
5360        assert!(client.request_instruments(&request).is_ok());
5361
5362        // Should handle malformed JSON gracefully with empty response
5363        let timeout = tokio::time::Duration::from_secs(3);
5364        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5365            tokio::time::timeout(timeout, rx.recv()).await
5366        {
5367            assert!(
5368                resp.data.is_empty(),
5369                "Expected empty response on malformed JSON"
5370            );
5371        }
5372    }
5373
5374    #[tokio::test]
5375    async fn test_missing_required_fields_in_response() {
5376        // Test handling when API response missing required fields
5377        use axum::{Json, Router, routing::get};
5378        use serde_json::{Value, json};
5379
5380        #[derive(Clone)]
5381        struct MissingFieldsState;
5382
5383        async fn missing_fields_handler() -> Json<Value> {
5384            // Missing critical fields like "ticker", "stepSize", etc.
5385            Json(json!({
5386                "markets": {
5387                    "BTC-USD": {
5388                        // Missing "ticker"
5389                        "status": "ACTIVE",
5390                        "baseAsset": "BTC",
5391                        "quoteAsset": "USD",
5392                        // Missing "stepSize", "tickSize", "minOrderSize"
5393                    }
5394                }
5395            }))
5396        }
5397
5398        let app = Router::new()
5399            .route("/v4/markets", get(missing_fields_handler))
5400            .with_state(MissingFieldsState);
5401
5402        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5403        let server_addr = listener.local_addr().unwrap();
5404        let port = server_addr.port();
5405
5406        tokio::spawn(async move {
5407            axum::serve(listener, app).await.unwrap();
5408        });
5409
5410        wait_for_server(server_addr).await;
5411
5412        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5413        set_data_event_sender(sender);
5414
5415        let client_id = ClientId::from("DYDX-MISSING");
5416        let config = DydxDataClientConfig {
5417            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5418            http_timeout_secs: Some(2),
5419            ..Default::default()
5420        };
5421
5422        let http_client = DydxHttpClient::new(
5423            config.base_url_http.clone(),
5424            config.http_timeout_secs,
5425            config.http_proxy_url.clone(),
5426            config.is_testnet,
5427            None,
5428        )
5429        .unwrap();
5430
5431        let client =
5432            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5433
5434        let request = RequestInstruments::new(
5435            None,
5436            None,
5437            Some(client_id),
5438            Some(*DYDX_VENUE),
5439            UUID4::new(),
5440            get_atomic_clock_realtime().get_time_ns(),
5441            None,
5442        );
5443
5444        assert!(client.request_instruments(&request).is_ok());
5445
5446        // Should handle missing fields gracefully (may skip instruments or return empty)
5447        let timeout = tokio::time::Duration::from_secs(3);
5448        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5449            tokio::time::timeout(timeout, rx.recv()).await
5450        {
5451            // Parse errors should result in empty or partial response
5452            // The important part is no panic
5453            assert!(resp.correlation_id == request.request_id);
5454        }
5455    }
5456
5457    #[tokio::test]
5458    async fn test_invalid_data_types_in_response() {
5459        // Test handling when API returns wrong data types
5460        use axum::{Json, Router, routing::get};
5461        use serde_json::{Value, json};
5462
5463        #[derive(Clone)]
5464        struct InvalidTypesState;
5465
5466        async fn invalid_types_handler() -> Json<Value> {
5467            // Wrong data types - strings instead of numbers, etc.
5468            Json(json!({
5469                "markets": {
5470                    "BTC-USD": {
5471                        "ticker": "BTC-USD",
5472                        "status": "ACTIVE",
5473                        "baseAsset": "BTC",
5474                        "quoteAsset": "USD",
5475                        "stepSize": "not_a_number",  // Should be numeric
5476                        "tickSize": true,  // Should be numeric
5477                        "minOrderSize": ["array"],  // Should be numeric
5478                        "market": 12345,  // Should be string
5479                    }
5480                }
5481            }))
5482        }
5483
5484        let app = Router::new()
5485            .route("/v4/markets", get(invalid_types_handler))
5486            .with_state(InvalidTypesState);
5487
5488        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5489        let server_addr = listener.local_addr().unwrap();
5490        let port = server_addr.port();
5491
5492        tokio::spawn(async move {
5493            axum::serve(listener, app).await.unwrap();
5494        });
5495
5496        wait_for_server(server_addr).await;
5497
5498        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5499        set_data_event_sender(sender);
5500
5501        let client_id = ClientId::from("DYDX-TYPES");
5502        let config = DydxDataClientConfig {
5503            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5504            http_timeout_secs: Some(2),
5505            ..Default::default()
5506        };
5507
5508        let http_client = DydxHttpClient::new(
5509            config.base_url_http.clone(),
5510            config.http_timeout_secs,
5511            config.http_proxy_url.clone(),
5512            config.is_testnet,
5513            None,
5514        )
5515        .unwrap();
5516
5517        let client =
5518            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5519
5520        let request = RequestInstruments::new(
5521            None,
5522            None,
5523            Some(client_id),
5524            Some(*DYDX_VENUE),
5525            UUID4::new(),
5526            get_atomic_clock_realtime().get_time_ns(),
5527            None,
5528        );
5529
5530        assert!(client.request_instruments(&request).is_ok());
5531
5532        // Should handle type errors gracefully
5533        let timeout = tokio::time::Duration::from_secs(3);
5534        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5535            tokio::time::timeout(timeout, rx.recv()).await
5536        {
5537            // Type mismatch should result in parse failure and empty/partial response
5538            assert!(resp.correlation_id == request.request_id);
5539        }
5540    }
5541
5542    #[tokio::test]
5543    async fn test_unexpected_response_structure() {
5544        // Test handling when API response has completely unexpected structure
5545        use axum::{Json, Router, routing::get};
5546        use serde_json::{Value, json};
5547
5548        #[derive(Clone)]
5549        struct UnexpectedState;
5550
5551        async fn unexpected_structure_handler() -> Json<Value> {
5552            // Completely different structure than expected
5553            Json(json!({
5554                "error": "Something went wrong",
5555                "code": 500,
5556                "data": null,
5557                "unexpected_field": {
5558                    "nested": {
5559                        "deeply": [1, 2, 3]
5560                    }
5561                }
5562            }))
5563        }
5564
5565        let app = Router::new()
5566            .route("/v4/markets", get(unexpected_structure_handler))
5567            .with_state(UnexpectedState);
5568
5569        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5570        let server_addr = listener.local_addr().unwrap();
5571        let port = server_addr.port();
5572
5573        tokio::spawn(async move {
5574            axum::serve(listener, app).await.unwrap();
5575        });
5576
5577        wait_for_server(server_addr).await;
5578
5579        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5580        set_data_event_sender(sender);
5581
5582        let client_id = ClientId::from("DYDX-STRUCT");
5583        let config = DydxDataClientConfig {
5584            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5585            http_timeout_secs: Some(2),
5586            ..Default::default()
5587        };
5588
5589        let http_client = DydxHttpClient::new(
5590            config.base_url_http.clone(),
5591            config.http_timeout_secs,
5592            config.http_proxy_url.clone(),
5593            config.is_testnet,
5594            None,
5595        )
5596        .unwrap();
5597
5598        let client =
5599            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5600
5601        let request = RequestInstruments::new(
5602            None,
5603            None,
5604            Some(client_id),
5605            Some(*DYDX_VENUE),
5606            UUID4::new(),
5607            get_atomic_clock_realtime().get_time_ns(),
5608            None,
5609        );
5610
5611        assert!(client.request_instruments(&request).is_ok());
5612
5613        // Should handle unexpected structure gracefully with empty response
5614        let timeout = tokio::time::Duration::from_secs(3);
5615        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5616            tokio::time::timeout(timeout, rx.recv()).await
5617        {
5618            assert!(
5619                resp.data.is_empty(),
5620                "Expected empty response on unexpected structure"
5621            );
5622            assert!(resp.correlation_id == request.request_id);
5623        }
5624    }
5625
5626    #[tokio::test]
5627    async fn test_empty_markets_object_in_response() {
5628        // Test handling when markets object is empty (valid JSON but no data)
5629        use axum::{Json, Router, routing::get};
5630        use serde_json::{Value, json};
5631
5632        #[derive(Clone)]
5633        struct EmptyMarketsState;
5634
5635        async fn empty_markets_handler() -> Json<Value> {
5636            Json(json!({
5637                "markets": {}
5638            }))
5639        }
5640
5641        let app = Router::new()
5642            .route("/v4/markets", get(empty_markets_handler))
5643            .with_state(EmptyMarketsState);
5644
5645        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5646        let server_addr = listener.local_addr().unwrap();
5647        let port = server_addr.port();
5648
5649        tokio::spawn(async move {
5650            axum::serve(listener, app).await.unwrap();
5651        });
5652
5653        wait_for_server(server_addr).await;
5654
5655        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5656        set_data_event_sender(sender);
5657
5658        let client_id = ClientId::from("DYDX-EMPTY");
5659        let config = DydxDataClientConfig {
5660            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5661            http_timeout_secs: Some(2),
5662            ..Default::default()
5663        };
5664
5665        let http_client = DydxHttpClient::new(
5666            config.base_url_http.clone(),
5667            config.http_timeout_secs,
5668            config.http_proxy_url.clone(),
5669            config.is_testnet,
5670            None,
5671        )
5672        .unwrap();
5673
5674        let client =
5675            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5676
5677        let request = RequestInstruments::new(
5678            None,
5679            None,
5680            Some(client_id),
5681            Some(*DYDX_VENUE),
5682            UUID4::new(),
5683            get_atomic_clock_realtime().get_time_ns(),
5684            None,
5685        );
5686
5687        assert!(client.request_instruments(&request).is_ok());
5688
5689        // Should handle empty markets gracefully
5690        let timeout = tokio::time::Duration::from_secs(3);
5691        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5692            tokio::time::timeout(timeout, rx.recv()).await
5693        {
5694            assert!(
5695                resp.data.is_empty(),
5696                "Expected empty response for empty markets"
5697            );
5698            assert!(resp.correlation_id == request.request_id);
5699        }
5700    }
5701
5702    #[tokio::test]
5703    async fn test_null_values_in_response() {
5704        // Test handling of null values in critical fields
5705        use axum::{Json, Router, routing::get};
5706        use serde_json::{Value, json};
5707
5708        #[derive(Clone)]
5709        struct NullValuesState;
5710
5711        async fn null_values_handler() -> Json<Value> {
5712            Json(json!({
5713                "markets": {
5714                    "BTC-USD": {
5715                        "ticker": null,
5716                        "status": "ACTIVE",
5717                        "baseAsset": null,
5718                        "quoteAsset": "USD",
5719                        "stepSize": null,
5720                    }
5721                }
5722            }))
5723        }
5724
5725        let app = Router::new()
5726            .route("/v4/markets", get(null_values_handler))
5727            .with_state(NullValuesState);
5728
5729        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5730        let server_addr = listener.local_addr().unwrap();
5731        let port = server_addr.port();
5732
5733        tokio::spawn(async move {
5734            axum::serve(listener, app).await.unwrap();
5735        });
5736
5737        wait_for_server(server_addr).await;
5738
5739        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5740        set_data_event_sender(sender);
5741
5742        let client_id = ClientId::from("DYDX-NULL");
5743        let config = DydxDataClientConfig {
5744            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5745            http_timeout_secs: Some(2),
5746            ..Default::default()
5747        };
5748
5749        let http_client = DydxHttpClient::new(
5750            config.base_url_http.clone(),
5751            config.http_timeout_secs,
5752            config.http_proxy_url.clone(),
5753            config.is_testnet,
5754            None,
5755        )
5756        .unwrap();
5757
5758        let client =
5759            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5760
5761        let request = RequestInstruments::new(
5762            None,
5763            None,
5764            Some(client_id),
5765            Some(*DYDX_VENUE),
5766            UUID4::new(),
5767            get_atomic_clock_realtime().get_time_ns(),
5768            None,
5769        );
5770
5771        assert!(client.request_instruments(&request).is_ok());
5772
5773        // Should handle null values gracefully
5774        let timeout = tokio::time::Duration::from_secs(3);
5775        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5776            tokio::time::timeout(timeout, rx.recv()).await
5777        {
5778            // Null values should cause parse failures and result in empty/partial response
5779            assert!(resp.correlation_id == request.request_id);
5780        }
5781    }
5782
5783    #[tokio::test]
5784    async fn test_invalid_instrument_id_format() {
5785        // Test handling of non-existent instrument (valid ID format but doesn't exist)
5786        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5787        set_data_event_sender(sender);
5788
5789        let client_id = ClientId::from("DYDX-INVALID-ID");
5790        let config = DydxDataClientConfig::default();
5791
5792        let http_client = DydxHttpClient::new(
5793            config.base_url_http.clone(),
5794            config.http_timeout_secs,
5795            config.http_proxy_url.clone(),
5796            config.is_testnet,
5797            None,
5798        )
5799        .unwrap();
5800
5801        let client =
5802            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5803
5804        // Valid format but non-existent instrument
5805        let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5806
5807        let request = RequestInstrument::new(
5808            non_existent_id,
5809            None,
5810            None,
5811            Some(client_id),
5812            UUID4::new(),
5813            get_atomic_clock_realtime().get_time_ns(),
5814            None,
5815        );
5816
5817        assert!(client.request_instrument(&request).is_ok());
5818
5819        // Should handle non-existent instrument gracefully
5820        let timeout = tokio::time::Duration::from_secs(2);
5821        let result = tokio::time::timeout(timeout, rx.recv()).await;
5822
5823        // Either no response or empty response is acceptable for non-existent instrument
5824        match result {
5825            Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5826                // Empty response acceptable
5827            }
5828            Ok(None) | Err(_) => {
5829                // Timeout or no response also acceptable
5830            }
5831            _ => {}
5832        }
5833    }
5834
5835    #[tokio::test]
5836    async fn test_invalid_date_range_end_before_start() {
5837        // Test handling when end date is before start date
5838        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5839        set_data_event_sender(sender);
5840
5841        let client_id = ClientId::from("DYDX-DATE-RANGE");
5842        let config = DydxDataClientConfig::default();
5843
5844        let http_client = DydxHttpClient::new(
5845            config.base_url_http.clone(),
5846            config.http_timeout_secs,
5847            config.http_proxy_url.clone(),
5848            config.is_testnet,
5849            None,
5850        )
5851        .unwrap();
5852
5853        let client =
5854            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5855
5856        let instrument = create_test_instrument_any();
5857        let instrument_id = instrument.id();
5858        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5859        client.instruments.insert(symbol_key, instrument);
5860
5861        // Invalid date range: end is before start
5862        let start = Utc::now();
5863        let end = start - chrono::Duration::hours(24); // End is 24 hours before start
5864
5865        let request = RequestTrades::new(
5866            instrument_id,
5867            Some(start),
5868            Some(end),
5869            None,
5870            Some(client_id),
5871            UUID4::new(),
5872            get_atomic_clock_realtime().get_time_ns(),
5873            None,
5874        );
5875
5876        assert!(client.request_trades(&request).is_ok());
5877
5878        // Should handle invalid range gracefully - may return empty or no response
5879        let timeout = tokio::time::Duration::from_secs(2);
5880        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5881            tokio::time::timeout(timeout, rx.recv()).await
5882        {
5883            // Empty response expected for invalid date range
5884            assert!(resp.correlation_id == request.request_id);
5885        }
5886    }
5887
5888    #[tokio::test]
5889    async fn test_negative_limit_value() {
5890        // Test handling of limit edge cases
5891        // Note: Rust's NonZeroUsize prevents negative/zero values at type level
5892        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5893        set_data_event_sender(sender);
5894
5895        let client_id = ClientId::from("DYDX-NEG-LIMIT");
5896        let config = DydxDataClientConfig::default();
5897
5898        let http_client = DydxHttpClient::new(
5899            config.base_url_http.clone(),
5900            config.http_timeout_secs,
5901            config.http_proxy_url.clone(),
5902            config.is_testnet,
5903            None,
5904        )
5905        .unwrap();
5906
5907        let client =
5908            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5909
5910        let instrument = create_test_instrument_any();
5911        let instrument_id = instrument.id();
5912        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5913        client.instruments.insert(symbol_key, instrument);
5914
5915        // Minimum valid limit (1)
5916        let request = RequestTrades::new(
5917            instrument_id,
5918            None,
5919            None,
5920            std::num::NonZeroUsize::new(1), // Minimum valid value
5921            Some(client_id),
5922            UUID4::new(),
5923            get_atomic_clock_realtime().get_time_ns(),
5924            None,
5925        );
5926
5927        // Should not panic with minimum limit
5928        assert!(client.request_trades(&request).is_ok());
5929    }
5930
5931    #[tokio::test]
5932    async fn test_zero_limit_value() {
5933        // Test handling of no limit (None = use API default)
5934        // Note: NonZeroUsize type prevents actual zero, so None represents "no limit"
5935        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5936        set_data_event_sender(sender);
5937
5938        let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5939        let config = DydxDataClientConfig::default();
5940
5941        let http_client = DydxHttpClient::new(
5942            config.base_url_http.clone(),
5943            config.http_timeout_secs,
5944            config.http_proxy_url.clone(),
5945            config.is_testnet,
5946            None,
5947        )
5948        .unwrap();
5949
5950        let client =
5951            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5952
5953        let instrument = create_test_instrument_any();
5954        let instrument_id = instrument.id();
5955        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5956        client.instruments.insert(symbol_key, instrument);
5957
5958        let request = RequestTrades::new(
5959            instrument_id,
5960            None,
5961            None,
5962            None, // No limit specified (None = use default)
5963            Some(client_id),
5964            UUID4::new(),
5965            get_atomic_clock_realtime().get_time_ns(),
5966            None,
5967        );
5968
5969        assert!(client.request_trades(&request).is_ok());
5970
5971        // Should handle None limit gracefully (uses API default)
5972        let timeout = tokio::time::Duration::from_secs(2);
5973        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5974            tokio::time::timeout(timeout, rx.recv()).await
5975        {
5976            assert!(resp.correlation_id == request.request_id);
5977        }
5978    }
5979
5980    #[tokio::test]
5981    async fn test_very_large_limit_value() {
5982        // Test handling of extremely large limit values (boundary testing)
5983        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5984        set_data_event_sender(sender);
5985
5986        let client_id = ClientId::from("DYDX-LARGE-LIMIT");
5987        let config = DydxDataClientConfig::default();
5988
5989        let http_client = DydxHttpClient::new(
5990            config.base_url_http.clone(),
5991            config.http_timeout_secs,
5992            config.http_proxy_url.clone(),
5993            config.is_testnet,
5994            None,
5995        )
5996        .unwrap();
5997
5998        let client =
5999            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6000
6001        let instrument = create_test_instrument_any();
6002        let instrument_id = instrument.id();
6003        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6004        client.instruments.insert(symbol_key, instrument);
6005
6006        let request = RequestTrades::new(
6007            instrument_id,
6008            None,
6009            None,
6010            std::num::NonZeroUsize::new(1_000_000), // Very large limit
6011            Some(client_id),
6012            UUID4::new(),
6013            get_atomic_clock_realtime().get_time_ns(),
6014            None,
6015        );
6016
6017        // Should not panic with very large limit
6018        assert!(client.request_trades(&request).is_ok());
6019
6020        // Should handle large limit gracefully
6021        let timeout = tokio::time::Duration::from_secs(2);
6022        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6023            tokio::time::timeout(timeout, rx.recv()).await
6024        {
6025            assert!(resp.correlation_id == request.request_id);
6026        }
6027    }
6028
6029    #[tokio::test]
6030    async fn test_none_limit_uses_default() {
6031        // Test that None limit falls back to default behavior
6032        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6033        set_data_event_sender(sender);
6034
6035        let client_id = ClientId::from("DYDX-NONE-LIMIT");
6036        let config = DydxDataClientConfig::default();
6037
6038        let http_client = DydxHttpClient::new(
6039            config.base_url_http.clone(),
6040            config.http_timeout_secs,
6041            config.http_proxy_url.clone(),
6042            config.is_testnet,
6043            None,
6044        )
6045        .unwrap();
6046
6047        let client =
6048            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6049
6050        let instrument = create_test_instrument_any();
6051        let instrument_id = instrument.id();
6052        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6053        client.instruments.insert(symbol_key, instrument);
6054
6055        let request = RequestTrades::new(
6056            instrument_id,
6057            None,
6058            None,
6059            None, // No limit specified
6060            Some(client_id),
6061            UUID4::new(),
6062            get_atomic_clock_realtime().get_time_ns(),
6063            None,
6064        );
6065
6066        // Should work fine with None limit (uses API default)
6067        assert!(client.request_trades(&request).is_ok());
6068
6069        let timeout = tokio::time::Duration::from_secs(2);
6070        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6071            tokio::time::timeout(timeout, rx.recv()).await
6072        {
6073            assert!(resp.correlation_id == request.request_id);
6074        }
6075    }
6076
6077    #[tokio::test]
6078    async fn test_validation_does_not_panic() {
6079        // Test that various validation edge cases don't cause panics
6080        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6081        set_data_event_sender(sender);
6082
6083        let client_id = ClientId::from("DYDX-VALIDATION");
6084        let config = DydxDataClientConfig::default();
6085
6086        let http_client = DydxHttpClient::new(
6087            config.base_url_http.clone(),
6088            config.http_timeout_secs,
6089            config.http_proxy_url.clone(),
6090            config.is_testnet,
6091            None,
6092        )
6093        .unwrap();
6094
6095        let client =
6096            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6097
6098        let instrument = create_test_instrument_any();
6099        let instrument_id = instrument.id();
6100        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6101        client.instruments.insert(symbol_key, instrument);
6102
6103        // Test 1: Invalid instrument ID
6104        let invalid_id = InstrumentId::from("INVALID.WRONG");
6105        let req1 = RequestInstrument::new(
6106            invalid_id,
6107            None,
6108            None,
6109            Some(client_id),
6110            UUID4::new(),
6111            get_atomic_clock_realtime().get_time_ns(),
6112            None,
6113        );
6114        assert!(client.request_instrument(&req1).is_ok());
6115
6116        // Test 2: Invalid date range
6117        let start = Utc::now();
6118        let end = start - chrono::Duration::hours(1);
6119        let req2 = RequestTrades::new(
6120            instrument_id,
6121            Some(start),
6122            Some(end),
6123            None,
6124            Some(client_id),
6125            UUID4::new(),
6126            get_atomic_clock_realtime().get_time_ns(),
6127            None,
6128        );
6129        assert!(client.request_trades(&req2).is_ok());
6130
6131        // Test 3: Minimum limit (1)
6132        let req3 = RequestTrades::new(
6133            instrument_id,
6134            None,
6135            None,
6136            std::num::NonZeroUsize::new(1),
6137            Some(client_id),
6138            UUID4::new(),
6139            get_atomic_clock_realtime().get_time_ns(),
6140            None,
6141        );
6142        assert!(client.request_trades(&req3).is_ok());
6143
6144        // Test 4: Very large limit
6145        let req4 = RequestTrades::new(
6146            instrument_id,
6147            None,
6148            None,
6149            std::num::NonZeroUsize::new(usize::MAX),
6150            Some(client_id),
6151            UUID4::new(),
6152            get_atomic_clock_realtime().get_time_ns(),
6153            None,
6154        );
6155        assert!(client.request_trades(&req4).is_ok());
6156
6157        // All validation edge cases handled without panic
6158    }
6159
6160    #[tokio::test]
6161    async fn test_instruments_response_has_correct_venue() {
6162        // Verify InstrumentsResponse includes correct DYDX venue
6163        use axum::{Json, Router, routing::get};
6164        use serde_json::{Value, json};
6165
6166        #[derive(Clone)]
6167        struct VenueTestState;
6168
6169        async fn venue_handler() -> Json<Value> {
6170            Json(json!({
6171                "markets": {
6172                    "BTC-USD": {
6173                        "ticker": "BTC-USD",
6174                        "status": "ACTIVE",
6175                        "baseAsset": "BTC",
6176                        "quoteAsset": "USD",
6177                        "stepSize": "0.0001",
6178                        "tickSize": "1",
6179                        "indexPrice": "50000",
6180                        "oraclePrice": "50000",
6181                        "priceChange24H": "1000",
6182                        "nextFundingRate": "0.0001",
6183                        "nextFundingAt": "2024-01-01T00:00:00.000Z",
6184                        "minOrderSize": "0.001",
6185                        "type": "PERPETUAL",
6186                        "initialMarginFraction": "0.05",
6187                        "maintenanceMarginFraction": "0.03",
6188                        "volume24H": "1000000",
6189                        "trades24H": "10000",
6190                        "openInterest": "5000000",
6191                        "incrementalInitialMarginFraction": "0.01",
6192                        "incrementalPositionSize": "10",
6193                        "maxPositionSize": "1000",
6194                        "baselinePositionSize": "100",
6195                        "assetResolution": "10000000000"
6196                    }
6197                }
6198            }))
6199        }
6200
6201        let app = Router::new()
6202            .route("/v4/markets", get(venue_handler))
6203            .with_state(VenueTestState);
6204
6205        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
6206        let server_addr = listener.local_addr().unwrap();
6207        let port = server_addr.port();
6208
6209        tokio::spawn(async move {
6210            axum::serve(listener, app).await.unwrap();
6211        });
6212
6213        wait_for_server(server_addr).await;
6214
6215        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6216        set_data_event_sender(sender);
6217
6218        let client_id = ClientId::from("DYDX-VENUE-TEST");
6219        let config = DydxDataClientConfig {
6220            base_url_http: Some(format!("http://127.0.0.1:{port}")),
6221            http_timeout_secs: Some(2),
6222            ..Default::default()
6223        };
6224
6225        let http_client = DydxHttpClient::new(
6226            config.base_url_http.clone(),
6227            config.http_timeout_secs,
6228            config.http_proxy_url.clone(),
6229            config.is_testnet,
6230            None,
6231        )
6232        .unwrap();
6233
6234        let client =
6235            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6236
6237        let request = RequestInstruments::new(
6238            None,
6239            None,
6240            Some(client_id),
6241            Some(*DYDX_VENUE),
6242            UUID4::new(),
6243            get_atomic_clock_realtime().get_time_ns(),
6244            None,
6245        );
6246
6247        assert!(client.request_instruments(&request).is_ok());
6248
6249        let timeout = tokio::time::Duration::from_secs(3);
6250        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6251            tokio::time::timeout(timeout, rx.recv()).await
6252        {
6253            // Verify venue is DYDX
6254            assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6255        }
6256    }
6257
6258    #[tokio::test]
6259    async fn test_instruments_response_contains_vec_instrument_any() {
6260        // Verify InstrumentsResponse contains Vec<InstrumentAny>
6261        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6262        set_data_event_sender(sender);
6263
6264        let client_id = ClientId::from("DYDX-VEC-TEST");
6265        let config = DydxDataClientConfig::default();
6266
6267        let http_client = DydxHttpClient::new(
6268            config.base_url_http.clone(),
6269            config.http_timeout_secs,
6270            config.http_proxy_url.clone(),
6271            config.is_testnet,
6272            None,
6273        )
6274        .unwrap();
6275
6276        let client =
6277            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6278
6279        let request = RequestInstruments::new(
6280            None,
6281            None,
6282            Some(client_id),
6283            Some(*DYDX_VENUE),
6284            UUID4::new(),
6285            get_atomic_clock_realtime().get_time_ns(),
6286            None,
6287        );
6288
6289        assert!(client.request_instruments(&request).is_ok());
6290
6291        let timeout = tokio::time::Duration::from_secs(2);
6292        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6293            tokio::time::timeout(timeout, rx.recv()).await
6294        {
6295            // Verify data is Vec<InstrumentAny>
6296            assert!(
6297                resp.data.is_empty() || !resp.data.is_empty(),
6298                "data should be Vec<InstrumentAny>"
6299            );
6300        }
6301    }
6302
6303    #[tokio::test]
6304    async fn test_instruments_response_includes_correlation_id() {
6305        // Verify InstrumentsResponse includes correlation_id matching request_id
6306        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6307        set_data_event_sender(sender);
6308
6309        let client_id = ClientId::from("DYDX-CORR-TEST");
6310        let config = DydxDataClientConfig::default();
6311
6312        let http_client = DydxHttpClient::new(
6313            config.base_url_http.clone(),
6314            config.http_timeout_secs,
6315            config.http_proxy_url.clone(),
6316            config.is_testnet,
6317            None,
6318        )
6319        .unwrap();
6320
6321        let client =
6322            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6323
6324        let request_id = UUID4::new();
6325        let request = RequestInstruments::new(
6326            None,
6327            None,
6328            Some(client_id),
6329            Some(*DYDX_VENUE),
6330            request_id,
6331            get_atomic_clock_realtime().get_time_ns(),
6332            None,
6333        );
6334
6335        assert!(client.request_instruments(&request).is_ok());
6336
6337        let timeout = tokio::time::Duration::from_secs(2);
6338        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6339            tokio::time::timeout(timeout, rx.recv()).await
6340        {
6341            // Verify correlation_id matches request_id
6342            assert_eq!(
6343                resp.correlation_id, request_id,
6344                "correlation_id should match request_id"
6345            );
6346        }
6347    }
6348
6349    #[tokio::test]
6350    async fn test_instruments_response_includes_client_id() {
6351        // Verify InstrumentsResponse includes client_id
6352        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6353        set_data_event_sender(sender);
6354
6355        let client_id = ClientId::from("DYDX-CLIENT-TEST");
6356        let config = DydxDataClientConfig::default();
6357
6358        let http_client = DydxHttpClient::new(
6359            config.base_url_http.clone(),
6360            config.http_timeout_secs,
6361            config.http_proxy_url.clone(),
6362            config.is_testnet,
6363            None,
6364        )
6365        .unwrap();
6366
6367        let client =
6368            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6369
6370        let request = RequestInstruments::new(
6371            None,
6372            None,
6373            Some(client_id),
6374            Some(*DYDX_VENUE),
6375            UUID4::new(),
6376            get_atomic_clock_realtime().get_time_ns(),
6377            None,
6378        );
6379
6380        assert!(client.request_instruments(&request).is_ok());
6381
6382        let timeout = tokio::time::Duration::from_secs(2);
6383        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6384            tokio::time::timeout(timeout, rx.recv()).await
6385        {
6386            // Verify client_id is included
6387            assert_eq!(
6388                resp.client_id, client_id,
6389                "client_id should be included in response"
6390            );
6391        }
6392    }
6393
6394    #[tokio::test]
6395    async fn test_instruments_response_includes_timestamps() {
6396        // Verify InstrumentsResponse includes start, end, and ts_init timestamps
6397        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6398        set_data_event_sender(sender);
6399
6400        let client_id = ClientId::from("DYDX-TS-TEST");
6401        let config = DydxDataClientConfig::default();
6402
6403        let http_client = DydxHttpClient::new(
6404            config.base_url_http.clone(),
6405            config.http_timeout_secs,
6406            config.http_proxy_url.clone(),
6407            config.is_testnet,
6408            None,
6409        )
6410        .unwrap();
6411
6412        let client =
6413            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6414
6415        let start = Some(Utc::now() - chrono::Duration::days(1));
6416        let end = Some(Utc::now());
6417        let ts_init = get_atomic_clock_realtime().get_time_ns();
6418
6419        let request = RequestInstruments::new(
6420            start,
6421            end,
6422            Some(client_id),
6423            Some(*DYDX_VENUE),
6424            UUID4::new(),
6425            ts_init,
6426            None,
6427        );
6428
6429        assert!(client.request_instruments(&request).is_ok());
6430
6431        let timeout = tokio::time::Duration::from_secs(2);
6432        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6433            tokio::time::timeout(timeout, rx.recv()).await
6434        {
6435            // Verify timestamps are included
6436            assert!(
6437                resp.start.is_some() || resp.start.is_none(),
6438                "start timestamp field exists"
6439            );
6440            assert!(
6441                resp.end.is_some() || resp.end.is_none(),
6442                "end timestamp field exists"
6443            );
6444            assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6445        }
6446    }
6447
6448    #[tokio::test]
6449    async fn test_instruments_response_includes_params_when_provided() {
6450        // Verify InstrumentsResponse includes params when provided in request
6451        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6452        set_data_event_sender(sender);
6453
6454        let client_id = ClientId::from("DYDX-PARAMS-TEST");
6455        let config = DydxDataClientConfig::default();
6456
6457        let http_client = DydxHttpClient::new(
6458            config.base_url_http.clone(),
6459            config.http_timeout_secs,
6460            config.http_proxy_url.clone(),
6461            config.is_testnet,
6462            None,
6463        )
6464        .unwrap();
6465
6466        let client =
6467            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6468
6469        // Since we can't easily create IndexMap in tests without importing,
6470        // just verify the params field exists by passing None
6471        let request = RequestInstruments::new(
6472            None,
6473            None,
6474            Some(client_id),
6475            Some(*DYDX_VENUE),
6476            UUID4::new(),
6477            get_atomic_clock_realtime().get_time_ns(),
6478            None, // params
6479        );
6480
6481        assert!(client.request_instruments(&request).is_ok());
6482
6483        let timeout = tokio::time::Duration::from_secs(2);
6484        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6485            tokio::time::timeout(timeout, rx.recv()).await
6486        {
6487            // Verify params field exists (structure validation)
6488            let _params = resp.params;
6489        }
6490    }
6491
6492    #[tokio::test]
6493    async fn test_instruments_response_params_none_when_not_provided() {
6494        // Verify InstrumentsResponse params is None when not provided
6495        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6496        set_data_event_sender(sender);
6497
6498        let client_id = ClientId::from("DYDX-NO-PARAMS");
6499        let config = DydxDataClientConfig::default();
6500
6501        let http_client = DydxHttpClient::new(
6502            config.base_url_http.clone(),
6503            config.http_timeout_secs,
6504            config.http_proxy_url.clone(),
6505            config.is_testnet,
6506            None,
6507        )
6508        .unwrap();
6509
6510        let client =
6511            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6512
6513        let request = RequestInstruments::new(
6514            None,
6515            None,
6516            Some(client_id),
6517            Some(*DYDX_VENUE),
6518            UUID4::new(),
6519            get_atomic_clock_realtime().get_time_ns(),
6520            None, // No params
6521        );
6522
6523        assert!(client.request_instruments(&request).is_ok());
6524
6525        let timeout = tokio::time::Duration::from_secs(2);
6526        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6527            tokio::time::timeout(timeout, rx.recv()).await
6528        {
6529            // Verify params field exists and is None when not provided
6530            assert!(
6531                resp.params.is_none(),
6532                "params should be None when not provided"
6533            );
6534        }
6535    }
6536
6537    #[tokio::test]
6538    async fn test_instruments_response_complete_structure() {
6539        // Comprehensive test verifying all InstrumentsResponse fields
6540        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6541        set_data_event_sender(sender);
6542
6543        let client_id = ClientId::from("DYDX-FULL-TEST");
6544        let config = DydxDataClientConfig::default();
6545
6546        let http_client = DydxHttpClient::new(
6547            config.base_url_http.clone(),
6548            config.http_timeout_secs,
6549            config.http_proxy_url.clone(),
6550            config.is_testnet,
6551            None,
6552        )
6553        .unwrap();
6554
6555        let client =
6556            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6557
6558        let request_id = UUID4::new();
6559        let start = Some(Utc::now() - chrono::Duration::hours(1));
6560        let end = Some(Utc::now());
6561        let ts_init = get_atomic_clock_realtime().get_time_ns();
6562
6563        let request = RequestInstruments::new(
6564            start,
6565            end,
6566            Some(client_id),
6567            Some(*DYDX_VENUE),
6568            request_id,
6569            ts_init,
6570            None,
6571        );
6572
6573        assert!(client.request_instruments(&request).is_ok());
6574
6575        let timeout = tokio::time::Duration::from_secs(2);
6576        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6577            tokio::time::timeout(timeout, rx.recv()).await
6578        {
6579            // Comprehensive validation of all fields
6580            assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6581            assert_eq!(
6582                resp.correlation_id, request_id,
6583                "correlation_id should match"
6584            );
6585            assert_eq!(resp.client_id, client_id, "client_id should match");
6586            assert!(resp.ts_init > 0, "ts_init should be set");
6587
6588            // data field exists (Vec<InstrumentAny>)
6589            let _data: Vec<InstrumentAny> = resp.data;
6590
6591            // Timestamp fields can be present or None
6592            let _start = resp.start;
6593            let _end = resp.end;
6594            let _params = resp.params;
6595        }
6596    }
6597
6598    #[tokio::test]
6599    async fn test_instrument_response_properly_boxed() {
6600        // Verify InstrumentResponse is properly boxed in DataResponse
6601        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6602        set_data_event_sender(sender);
6603
6604        let client_id = ClientId::from("DYDX-BOXED-TEST");
6605        let config = DydxDataClientConfig::default();
6606
6607        let http_client = DydxHttpClient::new(
6608            config.base_url_http.clone(),
6609            config.http_timeout_secs,
6610            config.http_proxy_url.clone(),
6611            config.is_testnet,
6612            None,
6613        )
6614        .unwrap();
6615
6616        let client =
6617            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6618
6619        let instrument = create_test_instrument_any();
6620        let instrument_id = instrument.id();
6621        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6622        client.instruments.insert(symbol_key, instrument);
6623
6624        let request = RequestInstrument::new(
6625            instrument_id,
6626            None,
6627            None,
6628            Some(client_id),
6629            UUID4::new(),
6630            get_atomic_clock_realtime().get_time_ns(),
6631            None,
6632        );
6633
6634        assert!(client.request_instrument(&request).is_ok());
6635
6636        let timeout = tokio::time::Duration::from_secs(2);
6637        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6638            tokio::time::timeout(timeout, rx.recv()).await
6639        {
6640            // Verify it's boxed - we receive Box<InstrumentResponse>
6641            let _response: Box<InstrumentResponse> = boxed_resp;
6642            // Successfully matched boxed pattern
6643        }
6644    }
6645
6646    #[tokio::test]
6647    async fn test_instrument_response_contains_single_instrument() {
6648        // Verify InstrumentResponse contains single InstrumentAny
6649        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6650        set_data_event_sender(sender);
6651
6652        let client_id = ClientId::from("DYDX-SINGLE-TEST");
6653        let config = DydxDataClientConfig::default();
6654
6655        let http_client = DydxHttpClient::new(
6656            config.base_url_http.clone(),
6657            config.http_timeout_secs,
6658            config.http_proxy_url.clone(),
6659            config.is_testnet,
6660            None,
6661        )
6662        .unwrap();
6663
6664        let client =
6665            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6666
6667        let instrument = create_test_instrument_any();
6668        let instrument_id = instrument.id();
6669        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6670        client.instruments.insert(symbol_key, instrument.clone());
6671
6672        let request = RequestInstrument::new(
6673            instrument_id,
6674            None,
6675            None,
6676            Some(client_id),
6677            UUID4::new(),
6678            get_atomic_clock_realtime().get_time_ns(),
6679            None,
6680        );
6681
6682        assert!(client.request_instrument(&request).is_ok());
6683
6684        let timeout = tokio::time::Duration::from_secs(2);
6685        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6686            tokio::time::timeout(timeout, rx.recv()).await
6687        {
6688            // Verify data contains single InstrumentAny
6689            let _instrument: InstrumentAny = resp.data;
6690            // Successfully matched InstrumentAny type
6691        }
6692    }
6693
6694    #[tokio::test]
6695    async fn test_instrument_response_has_correct_instrument_id() {
6696        // Verify InstrumentResponse has correct instrument_id
6697        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6698        set_data_event_sender(sender);
6699
6700        let client_id = ClientId::from("DYDX-ID-TEST");
6701        let config = DydxDataClientConfig::default();
6702
6703        let http_client = DydxHttpClient::new(
6704            config.base_url_http.clone(),
6705            config.http_timeout_secs,
6706            config.http_proxy_url.clone(),
6707            config.is_testnet,
6708            None,
6709        )
6710        .unwrap();
6711
6712        let client =
6713            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6714
6715        let instrument = create_test_instrument_any();
6716        let instrument_id = instrument.id();
6717        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6718        client.instruments.insert(symbol_key, instrument);
6719
6720        let request = RequestInstrument::new(
6721            instrument_id,
6722            None,
6723            None,
6724            Some(client_id),
6725            UUID4::new(),
6726            get_atomic_clock_realtime().get_time_ns(),
6727            None,
6728        );
6729
6730        assert!(client.request_instrument(&request).is_ok());
6731
6732        let timeout = tokio::time::Duration::from_secs(2);
6733        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6734            tokio::time::timeout(timeout, rx.recv()).await
6735        {
6736            // Verify instrument_id matches request
6737            assert_eq!(
6738                resp.instrument_id, instrument_id,
6739                "instrument_id should match requested ID"
6740            );
6741        }
6742    }
6743
6744    #[tokio::test]
6745    async fn test_instrument_response_includes_metadata() {
6746        // Verify InstrumentResponse includes all metadata fields
6747        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6748        set_data_event_sender(sender);
6749
6750        let client_id = ClientId::from("DYDX-META-TEST");
6751        let config = DydxDataClientConfig::default();
6752
6753        let http_client = DydxHttpClient::new(
6754            config.base_url_http.clone(),
6755            config.http_timeout_secs,
6756            config.http_proxy_url.clone(),
6757            config.is_testnet,
6758            None,
6759        )
6760        .unwrap();
6761
6762        let client =
6763            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6764
6765        let instrument = create_test_instrument_any();
6766        let instrument_id = instrument.id();
6767        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6768        client.instruments.insert(symbol_key, instrument);
6769
6770        let request_id = UUID4::new();
6771        let start = Some(Utc::now() - chrono::Duration::hours(1));
6772        let end = Some(Utc::now());
6773        let ts_init = get_atomic_clock_realtime().get_time_ns();
6774
6775        let request = RequestInstrument::new(
6776            instrument_id,
6777            start,
6778            end,
6779            Some(client_id),
6780            request_id,
6781            ts_init,
6782            None,
6783        );
6784
6785        assert!(client.request_instrument(&request).is_ok());
6786
6787        let timeout = tokio::time::Duration::from_secs(2);
6788        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6789            tokio::time::timeout(timeout, rx.recv()).await
6790        {
6791            // Verify all metadata fields
6792            assert_eq!(
6793                resp.correlation_id, request_id,
6794                "correlation_id should match"
6795            );
6796            assert_eq!(resp.client_id, client_id, "client_id should match");
6797            assert!(resp.ts_init > 0, "ts_init should be set");
6798
6799            // Timestamp fields exist (can be Some or None)
6800            let _start = resp.start;
6801            let _end = resp.end;
6802
6803            // Params field exists
6804            let _params = resp.params;
6805        }
6806    }
6807
6808    #[tokio::test]
6809    async fn test_instrument_response_matches_requested_instrument() {
6810        // Verify InstrumentResponse data matches the requested instrument exactly
6811        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6812        set_data_event_sender(sender);
6813
6814        let client_id = ClientId::from("DYDX-MATCH-TEST");
6815        let config = DydxDataClientConfig::default();
6816
6817        let http_client = DydxHttpClient::new(
6818            config.base_url_http.clone(),
6819            config.http_timeout_secs,
6820            config.http_proxy_url.clone(),
6821            config.is_testnet,
6822            None,
6823        )
6824        .unwrap();
6825
6826        let client =
6827            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6828
6829        let instrument = create_test_instrument_any();
6830        let instrument_id = instrument.id();
6831        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6832        client.instruments.insert(symbol_key, instrument.clone());
6833
6834        let request = RequestInstrument::new(
6835            instrument_id,
6836            None,
6837            None,
6838            Some(client_id),
6839            UUID4::new(),
6840            get_atomic_clock_realtime().get_time_ns(),
6841            None,
6842        );
6843
6844        assert!(client.request_instrument(&request).is_ok());
6845
6846        let timeout = tokio::time::Duration::from_secs(2);
6847        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6848            tokio::time::timeout(timeout, rx.recv()).await
6849        {
6850            // Verify returned instrument matches requested instrument
6851            assert_eq!(
6852                resp.data.id(),
6853                instrument_id,
6854                "Returned instrument should match requested"
6855            );
6856            assert_eq!(
6857                resp.instrument_id, instrument_id,
6858                "instrument_id field should match"
6859            );
6860
6861            // Both should point to the same instrument
6862            assert_eq!(
6863                resp.data.id(),
6864                resp.instrument_id,
6865                "data.id() should match instrument_id field"
6866            );
6867        }
6868    }
6869
6870    #[tokio::test]
6871    async fn test_instrument_response_complete_structure() {
6872        // Comprehensive test verifying all InstrumentResponse fields
6873        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6874        set_data_event_sender(sender);
6875
6876        let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6877        let config = DydxDataClientConfig::default();
6878
6879        let http_client = DydxHttpClient::new(
6880            config.base_url_http.clone(),
6881            config.http_timeout_secs,
6882            config.http_proxy_url.clone(),
6883            config.is_testnet,
6884            None,
6885        )
6886        .unwrap();
6887
6888        let client =
6889            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6890
6891        let instrument = create_test_instrument_any();
6892        let instrument_id = instrument.id();
6893        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6894        client.instruments.insert(symbol_key, instrument.clone());
6895
6896        let request_id = UUID4::new();
6897        let ts_init = get_atomic_clock_realtime().get_time_ns();
6898
6899        let request = RequestInstrument::new(
6900            instrument_id,
6901            None,
6902            None,
6903            Some(client_id),
6904            request_id,
6905            ts_init,
6906            None,
6907        );
6908
6909        assert!(client.request_instrument(&request).is_ok());
6910
6911        let timeout = tokio::time::Duration::from_secs(2);
6912        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6913            tokio::time::timeout(timeout, rx.recv()).await
6914        {
6915            // Comprehensive validation
6916            // 1. Boxed structure
6917            let _boxed: Box<InstrumentResponse> = resp.clone();
6918
6919            // 2. All required fields present
6920            assert_eq!(resp.correlation_id, request_id);
6921            assert_eq!(resp.client_id, client_id);
6922            assert_eq!(resp.instrument_id, instrument_id);
6923            assert!(resp.ts_init > 0);
6924
6925            // 3. Data field contains InstrumentAny
6926            let returned_instrument: InstrumentAny = resp.data;
6927            assert_eq!(returned_instrument.id(), instrument_id);
6928
6929            // 4. Optional fields exist
6930            let _start = resp.start;
6931            let _end = resp.end;
6932            let _params = resp.params;
6933        }
6934    }
6935
6936    #[tokio::test]
6937    async fn test_trades_response_contains_vec_trade_tick() {
6938        // Verify TradesResponse.data is Vec<TradeTick>
6939        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6940        set_data_event_sender(sender);
6941
6942        let created_at = Utc::now();
6943        let http_trades = vec![
6944            crate::http::models::Trade {
6945                id: "trade-1".to_string(),
6946                side: OrderSide::Buy,
6947                size: dec!(1.0),
6948                price: dec!(100.0),
6949                created_at,
6950                created_at_height: 100,
6951                trade_type: crate::common::enums::DydxTradeType::Limit,
6952            },
6953            crate::http::models::Trade {
6954                id: "trade-2".to_string(),
6955                side: OrderSide::Sell,
6956                size: dec!(2.0),
6957                price: dec!(101.0),
6958                created_at: created_at + chrono::Duration::seconds(1),
6959                created_at_height: 101,
6960                trade_type: crate::common::enums::DydxTradeType::Limit,
6961            },
6962        ];
6963
6964        let trades_response = crate::http::models::TradesResponse {
6965            trades: http_trades,
6966        };
6967
6968        let state = TradesTestState {
6969            response: Arc::new(trades_response),
6970            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6971            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6972        };
6973
6974        let addr = start_trades_test_server(state).await;
6975        let base_url = format!("http://{addr}");
6976
6977        let client_id = ClientId::from("DYDX-VEC-TEST");
6978        let config = DydxDataClientConfig {
6979            base_url_http: Some(base_url),
6980            is_testnet: true,
6981            ..Default::default()
6982        };
6983
6984        let http_client = DydxHttpClient::new(
6985            config.base_url_http.clone(),
6986            config.http_timeout_secs,
6987            config.http_proxy_url.clone(),
6988            config.is_testnet,
6989            None,
6990        )
6991        .unwrap();
6992
6993        let client =
6994            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6995
6996        let instrument = create_test_instrument_any();
6997        let instrument_id = instrument.id();
6998        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6999        client.instruments.insert(symbol_key, instrument);
7000
7001        let request = RequestTrades::new(
7002            instrument_id,
7003            None,
7004            None,
7005            None,
7006            Some(client_id),
7007            UUID4::new(),
7008            get_atomic_clock_realtime().get_time_ns(),
7009            None,
7010        );
7011
7012        assert!(client.request_trades(&request).is_ok());
7013
7014        let timeout = tokio::time::Duration::from_millis(500);
7015        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7016            tokio::time::timeout(timeout, rx.recv()).await
7017        {
7018            // Verify data is Vec<TradeTick>
7019            let trade_ticks: Vec<TradeTick> = resp.data;
7020            assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7021
7022            // Each element is a TradeTick
7023            for tick in &trade_ticks {
7024                assert_eq!(tick.instrument_id, instrument_id);
7025            }
7026        }
7027    }
7028
7029    #[tokio::test]
7030    async fn test_trades_response_has_correct_instrument_id() {
7031        // Verify TradesResponse.instrument_id matches request
7032        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7033        set_data_event_sender(sender);
7034
7035        let created_at = Utc::now();
7036        let http_trade = crate::http::models::Trade {
7037            id: "instrument-id-test".to_string(),
7038            side: OrderSide::Buy,
7039            size: dec!(1.0),
7040            price: dec!(100.0),
7041            created_at,
7042            created_at_height: 100,
7043            trade_type: crate::common::enums::DydxTradeType::Limit,
7044        };
7045
7046        let trades_response = crate::http::models::TradesResponse {
7047            trades: vec![http_trade],
7048        };
7049
7050        let state = TradesTestState {
7051            response: Arc::new(trades_response),
7052            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7053            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7054        };
7055
7056        let addr = start_trades_test_server(state).await;
7057        let base_url = format!("http://{addr}");
7058
7059        let client_id = ClientId::from("DYDX-INSTID-TEST");
7060        let config = DydxDataClientConfig {
7061            base_url_http: Some(base_url),
7062            is_testnet: true,
7063            ..Default::default()
7064        };
7065
7066        let http_client = DydxHttpClient::new(
7067            config.base_url_http.clone(),
7068            config.http_timeout_secs,
7069            config.http_proxy_url.clone(),
7070            config.is_testnet,
7071            None,
7072        )
7073        .unwrap();
7074
7075        let client =
7076            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7077
7078        let instrument = create_test_instrument_any();
7079        let instrument_id = instrument.id();
7080        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7081        client.instruments.insert(symbol_key, instrument);
7082
7083        let request = RequestTrades::new(
7084            instrument_id,
7085            None,
7086            None,
7087            None,
7088            Some(client_id),
7089            UUID4::new(),
7090            get_atomic_clock_realtime().get_time_ns(),
7091            None,
7092        );
7093
7094        assert!(client.request_trades(&request).is_ok());
7095
7096        let timeout = tokio::time::Duration::from_millis(500);
7097        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7098            tokio::time::timeout(timeout, rx.recv()).await
7099        {
7100            // Verify instrument_id field matches request
7101            assert_eq!(
7102                resp.instrument_id, instrument_id,
7103                "TradesResponse.instrument_id should match request"
7104            );
7105
7106            // Verify all trade ticks have the same instrument_id
7107            for tick in &resp.data {
7108                assert_eq!(
7109                    tick.instrument_id, instrument_id,
7110                    "Each TradeTick should have correct instrument_id"
7111                );
7112            }
7113        }
7114    }
7115
7116    #[tokio::test]
7117    async fn test_trades_response_properly_ordered_by_timestamp() {
7118        // Verify trades are ordered by timestamp (ascending)
7119        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7120        set_data_event_sender(sender);
7121
7122        let base_time = Utc::now();
7123        let http_trades = vec![
7124            crate::http::models::Trade {
7125                id: "trade-oldest".to_string(),
7126                side: OrderSide::Buy,
7127                size: dec!(1.0),
7128                price: dec!(100.0),
7129                created_at: base_time,
7130                created_at_height: 100,
7131                trade_type: crate::common::enums::DydxTradeType::Limit,
7132            },
7133            crate::http::models::Trade {
7134                id: "trade-middle".to_string(),
7135                side: OrderSide::Sell,
7136                size: dec!(2.0),
7137                price: dec!(101.0),
7138                created_at: base_time + chrono::Duration::seconds(1),
7139                created_at_height: 101,
7140                trade_type: crate::common::enums::DydxTradeType::Limit,
7141            },
7142            crate::http::models::Trade {
7143                id: "trade-newest".to_string(),
7144                side: OrderSide::Buy,
7145                size: dec!(3.0),
7146                price: dec!(102.0),
7147                created_at: base_time + chrono::Duration::seconds(2),
7148                created_at_height: 102,
7149                trade_type: crate::common::enums::DydxTradeType::Limit,
7150            },
7151        ];
7152
7153        let trades_response = crate::http::models::TradesResponse {
7154            trades: http_trades,
7155        };
7156
7157        let state = TradesTestState {
7158            response: Arc::new(trades_response),
7159            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7160            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7161        };
7162
7163        let addr = start_trades_test_server(state).await;
7164        let base_url = format!("http://{addr}");
7165
7166        let client_id = ClientId::from("DYDX-ORDER-TEST");
7167        let config = DydxDataClientConfig {
7168            base_url_http: Some(base_url),
7169            is_testnet: true,
7170            ..Default::default()
7171        };
7172
7173        let http_client = DydxHttpClient::new(
7174            config.base_url_http.clone(),
7175            config.http_timeout_secs,
7176            config.http_proxy_url.clone(),
7177            config.is_testnet,
7178            None,
7179        )
7180        .unwrap();
7181
7182        let client =
7183            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7184
7185        let instrument = create_test_instrument_any();
7186        let instrument_id = instrument.id();
7187        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7188        client.instruments.insert(symbol_key, instrument);
7189
7190        let request = RequestTrades::new(
7191            instrument_id,
7192            None,
7193            None,
7194            None,
7195            Some(client_id),
7196            UUID4::new(),
7197            get_atomic_clock_realtime().get_time_ns(),
7198            None,
7199        );
7200
7201        assert!(client.request_trades(&request).is_ok());
7202
7203        let timeout = tokio::time::Duration::from_millis(500);
7204        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7205            tokio::time::timeout(timeout, rx.recv()).await
7206        {
7207            // Verify trades are ordered by timestamp
7208            let trade_ticks = resp.data;
7209            assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7210
7211            // Check ascending timestamp order
7212            for i in 1..trade_ticks.len() {
7213                assert!(
7214                    trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7215                    "Trades should be ordered by timestamp (ts_event) in ascending order"
7216                );
7217            }
7218
7219            // Verify specific ordering
7220            assert!(
7221                trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7222                "First trade should be before second"
7223            );
7224            assert!(
7225                trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7226                "Second trade should be before third"
7227            );
7228        }
7229    }
7230
7231    #[tokio::test]
7232    async fn test_trades_response_all_trade_tick_fields_populated() {
7233        // Verify all TradeTick fields are properly populated
7234        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7235        set_data_event_sender(sender);
7236
7237        let created_at = Utc::now();
7238        let http_trade = crate::http::models::Trade {
7239            id: "field-test".to_string(),
7240            side: OrderSide::Buy,
7241            size: dec!(5.5),
7242            price: dec!(12345.67),
7243            created_at,
7244            created_at_height: 999,
7245            trade_type: crate::common::enums::DydxTradeType::Limit,
7246        };
7247
7248        let trades_response = crate::http::models::TradesResponse {
7249            trades: vec![http_trade],
7250        };
7251
7252        let state = TradesTestState {
7253            response: Arc::new(trades_response),
7254            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7255            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7256        };
7257
7258        let addr = start_trades_test_server(state).await;
7259        let base_url = format!("http://{addr}");
7260
7261        let client_id = ClientId::from("DYDX-FIELDS-TEST");
7262        let config = DydxDataClientConfig {
7263            base_url_http: Some(base_url),
7264            is_testnet: true,
7265            ..Default::default()
7266        };
7267
7268        let http_client = DydxHttpClient::new(
7269            config.base_url_http.clone(),
7270            config.http_timeout_secs,
7271            config.http_proxy_url.clone(),
7272            config.is_testnet,
7273            None,
7274        )
7275        .unwrap();
7276
7277        let client =
7278            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7279
7280        let instrument = create_test_instrument_any();
7281        let instrument_id = instrument.id();
7282        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7283        client.instruments.insert(symbol_key, instrument);
7284
7285        let request = RequestTrades::new(
7286            instrument_id,
7287            None,
7288            None,
7289            None,
7290            Some(client_id),
7291            UUID4::new(),
7292            get_atomic_clock_realtime().get_time_ns(),
7293            None,
7294        );
7295
7296        assert!(client.request_trades(&request).is_ok());
7297
7298        let timeout = tokio::time::Duration::from_millis(500);
7299        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7300            tokio::time::timeout(timeout, rx.recv()).await
7301        {
7302            assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7303            let tick = &resp.data[0];
7304
7305            // Verify all TradeTick fields are properly populated
7306            assert_eq!(
7307                tick.instrument_id, instrument_id,
7308                "instrument_id should be set"
7309            );
7310            assert!(tick.price.as_f64() > 0.0, "price should be positive");
7311            assert!(tick.size.as_f64() > 0.0, "size should be positive");
7312
7313            // Verify aggressor_side is set (Buy or Sell)
7314            match tick.aggressor_side {
7315                AggressorSide::Buyer | AggressorSide::Seller => {
7316                    // Valid aggressor side
7317                }
7318                AggressorSide::NoAggressor => {
7319                    panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7320                }
7321            }
7322
7323            // Verify trade_id is set
7324            assert!(
7325                !tick.trade_id.to_string().is_empty(),
7326                "trade_id should be set"
7327            );
7328
7329            // Verify timestamps are set and valid
7330            assert!(tick.ts_event > 0, "ts_event should be set");
7331            assert!(tick.ts_init > 0, "ts_init should be set");
7332            assert!(
7333                tick.ts_init >= tick.ts_event,
7334                "ts_init should be >= ts_event"
7335            );
7336        }
7337    }
7338
7339    #[tokio::test]
7340    async fn test_trades_response_includes_metadata() {
7341        // Verify TradesResponse includes all metadata fields
7342        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7343        set_data_event_sender(sender);
7344
7345        let created_at = Utc::now();
7346        let http_trade = crate::http::models::Trade {
7347            id: "metadata-test".to_string(),
7348            side: OrderSide::Buy,
7349            size: dec!(1.0),
7350            price: dec!(100.0),
7351            created_at,
7352            created_at_height: 100,
7353            trade_type: crate::common::enums::DydxTradeType::Limit,
7354        };
7355
7356        let trades_response = crate::http::models::TradesResponse {
7357            trades: vec![http_trade],
7358        };
7359
7360        let state = TradesTestState {
7361            response: Arc::new(trades_response),
7362            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7363            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7364        };
7365
7366        let addr = start_trades_test_server(state).await;
7367        let base_url = format!("http://{addr}");
7368
7369        let client_id = ClientId::from("DYDX-META-TEST");
7370        let config = DydxDataClientConfig {
7371            base_url_http: Some(base_url),
7372            is_testnet: true,
7373            ..Default::default()
7374        };
7375
7376        let http_client = DydxHttpClient::new(
7377            config.base_url_http.clone(),
7378            config.http_timeout_secs,
7379            config.http_proxy_url.clone(),
7380            config.is_testnet,
7381            None,
7382        )
7383        .unwrap();
7384
7385        let client =
7386            DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7387
7388        let instrument = create_test_instrument_any();
7389        let instrument_id = instrument.id();
7390        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7391        client.instruments.insert(symbol_key, instrument);
7392
7393        let request_id = UUID4::new();
7394        let request = RequestTrades::new(
7395            instrument_id,
7396            None,
7397            None,
7398            None,
7399            Some(client_id),
7400            request_id,
7401            get_atomic_clock_realtime().get_time_ns(),
7402            None,
7403        );
7404
7405        assert!(client.request_trades(&request).is_ok());
7406
7407        let timeout = tokio::time::Duration::from_millis(500);
7408        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7409            tokio::time::timeout(timeout, rx.recv()).await
7410        {
7411            // Verify metadata fields
7412            assert_eq!(
7413                resp.correlation_id, request_id,
7414                "correlation_id should match request"
7415            );
7416            assert_eq!(resp.client_id, client_id, "client_id should be set");
7417            assert_eq!(
7418                resp.instrument_id, instrument_id,
7419                "instrument_id should be set"
7420            );
7421            assert!(resp.ts_init > 0, "ts_init should be set");
7422
7423            let _start = resp.start;
7424            let _end = resp.end;
7425            let _params = resp.params;
7426        }
7427    }
7428
7429    #[tokio::test]
7430    async fn test_orderbook_cache_growth_with_many_instruments() {
7431        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7432        set_data_event_sender(sender);
7433
7434        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7435        let config = DydxDataClientConfig {
7436            base_url_http: Some(base_url),
7437            is_testnet: true,
7438            ..Default::default()
7439        };
7440
7441        let http_client = DydxHttpClient::new(
7442            config.base_url_http.clone(),
7443            config.http_timeout_secs,
7444            config.http_proxy_url.clone(),
7445            config.is_testnet,
7446            None,
7447        )
7448        .unwrap();
7449
7450        let client = DydxDataClient::new(
7451            ClientId::from("dydx_test"),
7452            config,
7453            http_client,
7454            create_test_ws_client(),
7455        )
7456        .unwrap();
7457
7458        let initial_capacity = client.order_books.capacity();
7459
7460        for i in 0..100 {
7461            let symbol = format!("INSTRUMENT-{i}");
7462            let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7463            client.order_books.insert(
7464                instrument_id,
7465                OrderBook::new(instrument_id, BookType::L2_MBP),
7466            );
7467        }
7468
7469        assert_eq!(client.order_books.len(), 100);
7470        assert!(client.order_books.capacity() >= initial_capacity);
7471
7472        client.order_books.clear();
7473        assert_eq!(client.order_books.len(), 0);
7474    }
7475
7476    #[rstest]
7477    fn test_instrument_id_validation_rejects_invalid_formats() {
7478        // InstrumentId::from() validates format and panics on invalid input
7479        let test_cases = vec![
7480            ("", "Empty string missing separator"),
7481            ("INVALID", "No venue separator"),
7482            ("NO-VENUE", "No venue separator"),
7483            (".DYDX", "Empty symbol"),
7484            ("SYMBOL.", "Empty venue"),
7485        ];
7486
7487        for (invalid_id, description) in test_cases {
7488            let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7489            assert!(
7490                result.is_err(),
7491                "Expected {invalid_id} to panic: {description}"
7492            );
7493        }
7494    }
7495
7496    #[rstest]
7497    fn test_instrument_id_validation_accepts_valid_formats() {
7498        let valid_ids = vec![
7499            "BTC-USD-PERP.DYDX",
7500            "ETH-USD-PERP.DYDX",
7501            "SOL-USD.DYDX",
7502            "AVAX-USD-PERP.DYDX",
7503        ];
7504
7505        for valid_id in valid_ids {
7506            let instrument_id = InstrumentId::from(valid_id);
7507            assert!(
7508                !instrument_id.symbol.as_str().is_empty()
7509                    && !instrument_id.venue.as_str().is_empty(),
7510                "Expected {valid_id} to have non-empty symbol and venue"
7511            );
7512        }
7513    }
7514
7515    #[tokio::test]
7516    async fn test_request_bars_with_inverted_date_range() {
7517        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7518        set_data_event_sender(sender);
7519
7520        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7521        let config = DydxDataClientConfig {
7522            base_url_http: Some(base_url),
7523            is_testnet: true,
7524            ..Default::default()
7525        };
7526
7527        let http_client = DydxHttpClient::new(
7528            config.base_url_http.clone(),
7529            config.http_timeout_secs,
7530            config.http_proxy_url.clone(),
7531            config.is_testnet,
7532            None,
7533        )
7534        .unwrap();
7535
7536        let client = DydxDataClient::new(
7537            ClientId::from("dydx_test"),
7538            config,
7539            http_client,
7540            create_test_ws_client(),
7541        )
7542        .unwrap();
7543
7544        let instrument = create_test_instrument_any();
7545        let instrument_id = instrument.id();
7546        client
7547            .instruments
7548            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7549
7550        let spec = BarSpecification {
7551            step: std::num::NonZeroUsize::new(1).unwrap(),
7552            aggregation: BarAggregation::Minute,
7553            price_type: PriceType::Last,
7554        };
7555        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7556
7557        let now = Utc::now();
7558        let start = Some(now);
7559        let end = Some(now - chrono::Duration::hours(1));
7560
7561        let request = RequestBars::new(
7562            bar_type,
7563            start,
7564            end,
7565            None,
7566            Some(ClientId::from("dydx_test")),
7567            UUID4::new(),
7568            get_atomic_clock_realtime().get_time_ns(),
7569            None,
7570        );
7571
7572        let result = client.request_bars(&request);
7573        assert!(result.is_ok());
7574    }
7575
7576    #[tokio::test]
7577    async fn test_request_bars_with_zero_limit() {
7578        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7579        set_data_event_sender(sender);
7580
7581        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7582        let config = DydxDataClientConfig {
7583            base_url_http: Some(base_url),
7584            is_testnet: true,
7585            ..Default::default()
7586        };
7587
7588        let http_client = DydxHttpClient::new(
7589            config.base_url_http.clone(),
7590            config.http_timeout_secs,
7591            config.http_proxy_url.clone(),
7592            config.is_testnet,
7593            None,
7594        )
7595        .unwrap();
7596
7597        let client = DydxDataClient::new(
7598            ClientId::from("dydx_test"),
7599            config,
7600            http_client,
7601            create_test_ws_client(),
7602        )
7603        .unwrap();
7604
7605        let instrument = create_test_instrument_any();
7606        let instrument_id = instrument.id();
7607        client
7608            .instruments
7609            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7610
7611        let spec = BarSpecification {
7612            step: std::num::NonZeroUsize::new(1).unwrap(),
7613            aggregation: BarAggregation::Minute,
7614            price_type: PriceType::Last,
7615        };
7616        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7617
7618        let request = RequestBars::new(
7619            bar_type,
7620            None,
7621            None,
7622            Some(std::num::NonZeroUsize::new(1).unwrap()),
7623            Some(ClientId::from("dydx_test")),
7624            UUID4::new(),
7625            get_atomic_clock_realtime().get_time_ns(),
7626            None,
7627        );
7628
7629        let result = client.request_bars(&request);
7630        assert!(result.is_ok());
7631    }
7632
7633    #[tokio::test]
7634    async fn test_request_trades_with_excessive_limit() {
7635        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7636        set_data_event_sender(sender);
7637
7638        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7639        let config = DydxDataClientConfig {
7640            base_url_http: Some(base_url),
7641            is_testnet: true,
7642            ..Default::default()
7643        };
7644
7645        let http_client = DydxHttpClient::new(
7646            config.base_url_http.clone(),
7647            config.http_timeout_secs,
7648            config.http_proxy_url.clone(),
7649            config.is_testnet,
7650            None,
7651        )
7652        .unwrap();
7653
7654        let client = DydxDataClient::new(
7655            ClientId::from("dydx_test"),
7656            config,
7657            http_client,
7658            create_test_ws_client(),
7659        )
7660        .unwrap();
7661
7662        let instrument = create_test_instrument_any();
7663        let instrument_id = instrument.id();
7664        client
7665            .instruments
7666            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7667
7668        let request = RequestTrades::new(
7669            instrument_id,
7670            None,
7671            None,
7672            Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7673            Some(ClientId::from("dydx_test")),
7674            UUID4::new(),
7675            get_atomic_clock_realtime().get_time_ns(),
7676            None,
7677        );
7678
7679        let result = client.request_trades(&request);
7680        assert!(result.is_ok());
7681    }
7682
7683    #[rstest]
7684    fn test_candle_topic_format() {
7685        let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7686        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7687        let resolution = "1MIN";
7688        let topic = format!("{ticker}/{resolution}");
7689
7690        assert_eq!(topic, "BTC-USD/1MIN");
7691        assert!(!topic.contains("-PERP"));
7692        assert!(!topic.contains(".DYDX"));
7693    }
7694}