nautilus_dydx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent, DataResponse,
29        data::{
30            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
31            RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
32            SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
33            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
34            UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
35            UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38};
39use nautilus_core::{
40    UnixNanos,
41    datetime::datetime_to_unix_nanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46    data::{
47        Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
48        OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
49    },
50    enums::{BarAggregation, BookAction, BookType, OrderSide, RecordFlag},
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53    orderbook::OrderBook,
54    types::{Price, Quantity, price::PriceRaw},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61    common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
62    config::DydxDataClientConfig,
63    http::client::DydxHttpClient,
64    types::DydxOraclePrice,
65    websocket::client::DydxWebSocketClient,
66};
67
68/// Groups WebSocket message handling dependencies.
69struct WsMessageContext<'a> {
70    data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
71    instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
72    order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
73    last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
74    ws_client: &'a Option<DydxWebSocketClient>,
75    active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
76    active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
77    active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
78    incomplete_bars: &'a Arc<DashMap<BarType, Bar>>,
79}
80
81/// dYdX data client for live market data streaming and historical data requests.
82///
83/// This client integrates with the Nautilus DataEngine to provide:
84/// - Real-time market data via WebSocket subscriptions
85/// - Historical data via REST API requests
86/// - Automatic instrument discovery and caching
87/// - Connection lifecycle management
88#[derive(Debug)]
89pub struct DydxDataClient {
90    /// The client ID for this data client.
91    client_id: ClientId,
92    /// Configuration for the data client.
93    config: DydxDataClientConfig,
94    /// HTTP client for REST API requests.
95    http_client: DydxHttpClient,
96    /// WebSocket client for real-time data streaming (optional).
97    ws_client: Option<DydxWebSocketClient>,
98    /// Whether the client is currently connected.
99    is_connected: AtomicBool,
100    /// Cancellation token for async operations.
101    cancellation_token: CancellationToken,
102    /// Background task handles.
103    tasks: Vec<JoinHandle<()>>,
104    /// Channel sender for emitting data events to the DataEngine.
105    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
106    /// Cached instruments by symbol (shared with HTTP client via `Arc<DashMap<Ustr, InstrumentAny>>`).
107    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
108    /// High-resolution clock for timestamps.
109    clock: &'static AtomicTime,
110    /// Local order books maintained for generating quotes and resolving crosses.
111    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
112    /// Last quote tick per instrument (used for quote generation from book deltas).
113    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
114    /// Incomplete bars cache for bar aggregation.
115    /// Tracks bars not yet closed (ts_event > current_time), keyed by BarType.
116    /// Bars are emitted only when they close (ts_event <= current_time).
117    incomplete_bars: Arc<DashMap<BarType, Bar>>,
118    /// WebSocket topic to BarType mappings.
119    /// Maps dYdX candle topics (e.g., "BTC-USD/1MIN") to Nautilus BarType.
120    /// Used for subscription validation and reconnection recovery.
121    bar_type_mappings: Arc<DashMap<String, BarType>>,
122    /// Active orderbook subscriptions for periodic snapshot refresh.
123    active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
124    /// Active trade subscriptions for reconnection recovery.
125    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
126    /// Active bar/candle subscriptions for reconnection recovery (maps instrument+resolution to BarType).
127    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
128}
129
130impl DydxDataClient {
131    /// Maps Nautilus BarType spec to dYdX candle resolution string.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the bar aggregation or step is not supported by dYdX.
136    fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
137        match spec.step.get() {
138            1 => match spec.aggregation {
139                BarAggregation::Minute => Ok("1MIN"),
140                BarAggregation::Hour => Ok("1HOUR"),
141                BarAggregation::Day => Ok("1DAY"),
142                _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143            },
144            5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145            15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146            30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147            4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148            step => anyhow::bail!(
149                "Unsupported bar step: {step} with aggregation {:?}",
150                spec.aggregation
151            ),
152        }
153    }
154
155    /// Creates a new [`DydxDataClient`] instance.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the client fails to initialize.
160    pub fn new(
161        client_id: ClientId,
162        config: DydxDataClientConfig,
163        http_client: DydxHttpClient,
164        ws_client: Option<DydxWebSocketClient>,
165    ) -> anyhow::Result<Self> {
166        let clock = get_atomic_clock_realtime();
167        let data_sender = get_data_event_sender();
168
169        // Clone the instruments cache before moving http_client
170        let instruments_cache = http_client.instruments().clone();
171
172        Ok(Self {
173            client_id,
174            config,
175            http_client,
176            ws_client,
177            is_connected: AtomicBool::new(false),
178            cancellation_token: CancellationToken::new(),
179            tasks: Vec::new(),
180            data_sender,
181            instruments: instruments_cache,
182            clock,
183            order_books: Arc::new(DashMap::new()),
184            last_quotes: Arc::new(DashMap::new()),
185            incomplete_bars: Arc::new(DashMap::new()),
186            bar_type_mappings: Arc::new(DashMap::new()),
187            active_orderbook_subs: Arc::new(DashMap::new()),
188            active_trade_subs: Arc::new(DashMap::new()),
189            active_bar_subs: Arc::new(DashMap::new()),
190        })
191    }
192
193    /// Returns the venue for this data client.
194    #[must_use]
195    pub fn venue(&self) -> Venue {
196        *DYDX_VENUE
197    }
198
199    fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200        self.ws_client
201            .as_ref()
202            .context("websocket client not initialized; call connect first")
203    }
204
205    /// Mutable WebSocket client access for operations requiring mutable references.
206    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207        self.ws_client
208            .as_mut()
209            .context("websocket client not initialized; call connect first")
210    }
211
212    /// Returns `true` when the client is connected.
213    #[must_use]
214    pub fn is_connected(&self) -> bool {
215        self.is_connected.load(Ordering::Relaxed)
216    }
217
218    /// Spawns an async WebSocket task with error handling.
219    ///
220    /// This helper ensures consistent error logging across all subscription methods.
221    fn spawn_ws<F>(&self, fut: F, context: &'static str)
222    where
223        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224    {
225        get_runtime().spawn(async move {
226            if let Err(e) = fut.await {
227                tracing::error!("{context}: {e:?}");
228            }
229        });
230    }
231
232    /// Bootstrap instruments from the dYdX Indexer API.
233    ///
234    /// This method:
235    /// 1. Fetches all available instruments from the REST API
236    /// 2. Caches them in the HTTP client
237    /// 3. Caches them in the WebSocket client (if present)
238    /// 4. Populates the local instruments cache
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if:
243    /// - The HTTP request fails.
244    /// - Instrument parsing fails.
245    ///
246    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247        tracing::info!("Bootstrapping dYdX instruments");
248
249        // Populates all HTTP cache layers (instruments, clob_pair_id, market_params)
250        self.http_client
251            .fetch_and_cache_instruments()
252            .await
253            .context("failed to load instruments from dYdX")?;
254
255        let instruments: Vec<InstrumentAny> = self
256            .http_client
257            .instruments()
258            .iter()
259            .map(|entry| entry.value().clone())
260            .collect();
261
262        if instruments.is_empty() {
263            tracing::warn!("No dYdX instruments were loaded");
264            return Ok(instruments);
265        }
266
267        tracing::info!("Loaded {} dYdX instruments", instruments.len());
268
269        if let Some(ref ws) = self.ws_client {
270            ws.cache_instruments(instruments.clone());
271        }
272
273        Ok(instruments)
274    }
275}
276
277#[async_trait::async_trait(?Send)]
278impl DataClient for DydxDataClient {
279    fn client_id(&self) -> ClientId {
280        self.client_id
281    }
282
283    fn venue(&self) -> Option<Venue> {
284        Some(*DYDX_VENUE)
285    }
286
287    fn start(&mut self) -> anyhow::Result<()> {
288        tracing::info!(
289            client_id = %self.client_id,
290            is_testnet = self.http_client.is_testnet(),
291            "Starting dYdX data client"
292        );
293        Ok(())
294    }
295
296    fn stop(&mut self) -> anyhow::Result<()> {
297        tracing::info!("Stopping dYdX data client {}", self.client_id);
298        self.cancellation_token.cancel();
299        self.is_connected.store(false, Ordering::Relaxed);
300        Ok(())
301    }
302
303    fn reset(&mut self) -> anyhow::Result<()> {
304        tracing::debug!("Resetting dYdX data client {}", self.client_id);
305        self.is_connected.store(false, Ordering::Relaxed);
306        self.cancellation_token = CancellationToken::new();
307        self.tasks.clear();
308        Ok(())
309    }
310
311    fn dispose(&mut self) -> anyhow::Result<()> {
312        tracing::debug!("Disposing dYdX data client {}", self.client_id);
313        self.stop()
314    }
315
316    async fn connect(&mut self) -> anyhow::Result<()> {
317        if self.is_connected() {
318            return Ok(());
319        }
320
321        tracing::info!("Connecting dYdX data client");
322
323        // Bootstrap instruments first
324        self.bootstrap_instruments().await?;
325
326        // Connect WebSocket client and subscribe to market updates
327        if self.ws_client.is_some() {
328            let ws = self.ws_client_mut()?;
329
330            ws.connect()
331                .await
332                .context("failed to connect dYdX websocket")?;
333
334            ws.subscribe_markets()
335                .await
336                .context("failed to subscribe to markets channel")?;
337
338            // Start message processing task (handler already converts to NautilusWsMessage)
339            if let Some(rx) = ws.take_receiver() {
340                let data_tx = self.data_sender.clone();
341                let instruments = self.instruments.clone();
342                let order_books = self.order_books.clone();
343                let last_quotes = self.last_quotes.clone();
344                let ws_client = self.ws_client.clone();
345                let active_orderbook_subs = self.active_orderbook_subs.clone();
346                let active_trade_subs = self.active_trade_subs.clone();
347                let active_bar_subs = self.active_bar_subs.clone();
348                let incomplete_bars = self.incomplete_bars.clone();
349
350                let task = get_runtime().spawn(async move {
351                    let mut rx = rx;
352                    while let Some(msg) = rx.recv().await {
353                        let ctx = WsMessageContext {
354                            data_sender: &data_tx,
355                            instruments: &instruments,
356                            order_books: &order_books,
357                            last_quotes: &last_quotes,
358                            ws_client: &ws_client,
359                            active_orderbook_subs: &active_orderbook_subs,
360                            active_trade_subs: &active_trade_subs,
361                            active_bar_subs: &active_bar_subs,
362                            incomplete_bars: &incomplete_bars,
363                        };
364                        Self::handle_ws_message(msg, &ctx);
365                    }
366                });
367                self.tasks.push(task);
368            } else {
369                tracing::warn!("No inbound WS receiver available after connect");
370            }
371        }
372
373        // Start orderbook snapshot refresh task
374        self.start_orderbook_refresh_task()?;
375
376        // Start instrument refresh task
377        self.start_instrument_refresh_task()?;
378
379        self.is_connected.store(true, Ordering::Relaxed);
380        tracing::info!("Connected dYdX data client");
381
382        Ok(())
383    }
384
385    async fn disconnect(&mut self) -> anyhow::Result<()> {
386        if !self.is_connected() {
387            return Ok(());
388        }
389
390        tracing::info!("Disconnecting dYdX data client");
391
392        // Disconnect WebSocket client if present
393        if let Some(ref mut ws) = self.ws_client {
394            ws.disconnect()
395                .await
396                .context("failed to disconnect dYdX websocket")?;
397        }
398
399        self.is_connected.store(false, Ordering::Relaxed);
400        tracing::info!("Disconnected dYdX data client");
401
402        Ok(())
403    }
404
405    fn is_connected(&self) -> bool {
406        self.is_connected.load(Ordering::Relaxed)
407    }
408
409    fn is_disconnected(&self) -> bool {
410        !self.is_connected()
411    }
412
413    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
414        // dYdX uses a global markets channel which streams instruments implicitly.
415        // There is no dedicated instruments subscription, so this is a no-op to
416        // mirror the behaviour of `subscribe_instruments`.
417        tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
418        Ok(())
419    }
420
421    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
422        // dYdX does not support per-instrument instrument feed subscriptions.
423        // The markets channel always streams all instruments, so this is a no-op.
424        tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
425        Ok(())
426    }
427
428    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
429        // dYdX markets channel auto-subscribes to all instruments
430        // No explicit subscription needed - already handled in connect()
431        tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
432        Ok(())
433    }
434
435    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
436        // dYdX markets channel auto-subscribes to all instruments
437        // Individual instrument subscriptions not supported - full feed only
438        tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
439        Ok(())
440    }
441
442    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
443        let ws = self.ws_client()?.clone();
444        let instrument_id = cmd.instrument_id;
445
446        // Track active subscription for reconnection recovery
447        self.active_trade_subs.insert(instrument_id, ());
448
449        self.spawn_ws(
450            async move {
451                ws.subscribe_trades(instrument_id)
452                    .await
453                    .context("trade subscription")
454            },
455            "dYdX trade subscription",
456        );
457
458        Ok(())
459    }
460
461    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462        if cmd.book_type != BookType::L2_MBP {
463            anyhow::bail!(
464                "dYdX only supports L2_MBP order book deltas, received {:?}",
465                cmd.book_type
466            );
467        }
468
469        // Ensure local order book exists for this instrument.
470        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
471
472        // Track active subscription for periodic refresh
473        self.active_orderbook_subs.insert(cmd.instrument_id, ());
474
475        let ws = self.ws_client()?.clone();
476        let instrument_id = cmd.instrument_id;
477
478        self.spawn_ws(
479            async move {
480                ws.subscribe_orderbook(instrument_id)
481                    .await
482                    .context("orderbook subscription")
483            },
484            "dYdX orderbook subscription",
485        );
486
487        Ok(())
488    }
489
490    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
491        if cmd.book_type != BookType::L2_MBP {
492            anyhow::bail!(
493                "dYdX only supports L2_MBP order book snapshots, received {:?}",
494                cmd.book_type
495            );
496        }
497
498        // Track active subscription for periodic refresh
499        self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501        let ws = self.ws_client()?.clone();
502        let instrument_id = cmd.instrument_id;
503
504        get_runtime().spawn(async move {
505            if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506                tracing::error!(
507                    "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508                );
509            }
510        });
511
512        Ok(())
513    }
514
515    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516        // dYdX doesn't have a dedicated quotes channel
517        // Quotes are synthesized from order book deltas
518        tracing::debug!(
519            "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520            cmd.instrument_id
521        );
522
523        // Simply delegate to book deltas subscription
524        let book_cmd = SubscribeBookDeltas {
525            client_id: cmd.client_id,
526            venue: cmd.venue,
527            instrument_id: cmd.instrument_id,
528            book_type: BookType::L2_MBP,
529            depth: None,
530            managed: false,
531            params: None,
532            command_id: cmd.command_id,
533            ts_init: cmd.ts_init,
534        };
535
536        self.subscribe_book_deltas(&book_cmd)
537    }
538
539    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
540        let ws = self.ws_client()?.clone();
541        let instrument_id = cmd.bar_type.instrument_id();
542        let spec = cmd.bar_type.spec();
543
544        // Use centralized bar spec mapping
545        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
546
547        // Track active subscription for reconnection recovery
548        let bar_type = cmd.bar_type;
549        self.active_bar_subs
550            .insert((instrument_id, resolution.to_string()), bar_type);
551
552        // Register topic → BarType mapping for validation and lookup
553        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
554        let topic = format!("{ticker}/{resolution}");
555        self.bar_type_mappings.insert(topic.clone(), bar_type);
556
557        self.spawn_ws(
558            async move {
559                // Register bar type in handler BEFORE subscribing to avoid race condition
560                if let Err(e) =
561                    ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
562                        topic,
563                        bar_type,
564                    })
565                {
566                    anyhow::bail!("Failed to register bar type: {e}");
567                }
568
569                // Delay to ensure handler processes registration before candle messages arrive
570                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
571
572                ws.subscribe_candles(instrument_id, resolution)
573                    .await
574                    .context("candles subscription")
575            },
576            "dYdX candles subscription",
577        );
578
579        Ok(())
580    }
581
582    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
583        // Remove from active subscription tracking
584        self.active_trade_subs.remove(&cmd.instrument_id);
585
586        let ws = self.ws_client()?.clone();
587        let instrument_id = cmd.instrument_id;
588
589        self.spawn_ws(
590            async move {
591                ws.unsubscribe_trades(instrument_id)
592                    .await
593                    .context("trade unsubscription")
594            },
595            "dYdX trade unsubscription",
596        );
597
598        Ok(())
599    }
600
601    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
602        // Remove from active subscription tracking
603        self.active_orderbook_subs.remove(&cmd.instrument_id);
604
605        let ws = self.ws_client()?.clone();
606        let instrument_id = cmd.instrument_id;
607
608        self.spawn_ws(
609            async move {
610                ws.unsubscribe_orderbook(instrument_id)
611                    .await
612                    .context("orderbook unsubscription")
613            },
614            "dYdX orderbook unsubscription",
615        );
616
617        Ok(())
618    }
619
620    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
621        // dYdX orderbook channel provides both snapshots and deltas.
622        // Unsubscribing snapshots uses the same underlying channel as deltas.
623        // Remove from active subscription tracking
624        self.active_orderbook_subs.remove(&cmd.instrument_id);
625
626        let ws = self.ws_client()?.clone();
627        let instrument_id = cmd.instrument_id;
628
629        self.spawn_ws(
630            async move {
631                ws.unsubscribe_orderbook(instrument_id)
632                    .await
633                    .context("orderbook snapshot unsubscription")
634            },
635            "dYdX orderbook snapshot unsubscription",
636        );
637
638        Ok(())
639    }
640
641    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
642        // dYdX doesn't have a dedicated quotes channel; quotes are derived from book deltas.
643        tracing::debug!(
644            "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
645            cmd.instrument_id
646        );
647
648        let book_cmd = UnsubscribeBookDeltas {
649            instrument_id: cmd.instrument_id,
650            client_id: cmd.client_id,
651            venue: cmd.venue,
652            command_id: cmd.command_id,
653            ts_init: cmd.ts_init,
654            params: cmd.params.clone(),
655        };
656
657        self.unsubscribe_book_deltas(&book_cmd)
658    }
659
660    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
661        let ws = self.ws_client()?.clone();
662        let instrument_id = cmd.bar_type.instrument_id();
663        let spec = cmd.bar_type.spec();
664
665        // Map BarType spec to dYdX candle resolution string
666        let resolution = match spec.step.get() {
667            1 => match spec.aggregation {
668                BarAggregation::Minute => "1MIN",
669                BarAggregation::Hour => "1HOUR",
670                BarAggregation::Day => "1DAY",
671                _ => {
672                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
673                }
674            },
675            5 => {
676                if spec.aggregation == BarAggregation::Minute {
677                    "5MINS"
678                } else {
679                    anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
680                }
681            }
682            15 => {
683                if spec.aggregation == BarAggregation::Minute {
684                    "15MINS"
685                } else {
686                    anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
687                }
688            }
689            30 => {
690                if spec.aggregation == BarAggregation::Minute {
691                    "30MINS"
692                } else {
693                    anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
694                }
695            }
696            4 => {
697                if spec.aggregation == BarAggregation::Hour {
698                    "4HOURS"
699                } else {
700                    anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
701                }
702            }
703            step => {
704                anyhow::bail!("Unsupported bar step: {step}");
705            }
706        };
707
708        // Remove from active subscription tracking
709        self.active_bar_subs
710            .remove(&(instrument_id, resolution.to_string()));
711
712        // Unregister bar type from handler and local mappings
713        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
714        let topic = format!("{ticker}/{resolution}");
715        self.bar_type_mappings.remove(&topic);
716
717        if let Err(e) =
718            ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
719        {
720            tracing::warn!("Failed to unregister bar type: {e}");
721        }
722
723        self.spawn_ws(
724            async move {
725                ws.unsubscribe_candles(instrument_id, resolution)
726                    .await
727                    .context("candles unsubscription")
728            },
729            "dYdX candles unsubscription",
730        );
731
732        Ok(())
733    }
734
735    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
736        let instruments_cache = self.instruments.clone();
737        let sender = self.data_sender.clone();
738        let http = self.http_client.clone();
739        let instrument_id = request.instrument_id;
740        let request_id = request.request_id;
741        let client_id = request.client_id.unwrap_or(self.client_id);
742        let start = request.start;
743        let end = request.end;
744        let params = request.params.clone();
745        let clock = self.clock;
746        let start_nanos = datetime_to_unix_nanos(start);
747        let end_nanos = datetime_to_unix_nanos(end);
748
749        get_runtime().spawn(async move {
750            // First try to get from cache
751            let symbol = Ustr::from(instrument_id.symbol.as_str());
752            let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
753                tracing::debug!("Found instrument {instrument_id} in cache");
754                Some(cached.clone())
755            } else {
756                // Not in cache, fetch from API
757                tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
758                match http.request_instruments(None, None, None).await {
759                    Ok(instruments) => {
760                        // Cache all fetched instruments
761                        for inst in &instruments {
762                            upsert_instrument(&instruments_cache, inst.clone());
763                        }
764                        // Find the requested instrument
765                        instruments.into_iter().find(|i| i.id() == instrument_id)
766                    }
767                    Err(e) => {
768                        tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
769                        None
770                    }
771                }
772            };
773
774            if let Some(inst) = instrument {
775                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
776                    request_id,
777                    client_id,
778                    instrument_id,
779                    inst,
780                    start_nanos,
781                    end_nanos,
782                    clock.get_time_ns(),
783                    params,
784                )));
785
786                if let Err(e) = sender.send(DataEvent::Response(response)) {
787                    tracing::error!("Failed to send instrument response: {e}");
788                }
789            } else {
790                tracing::error!("Instrument {instrument_id} not found");
791            }
792        });
793
794        Ok(())
795    }
796
797    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
798        let http = self.http_client.clone();
799        let sender = self.data_sender.clone();
800        let instruments_cache = self.instruments.clone();
801        let request_id = request.request_id;
802        let client_id = request.client_id.unwrap_or(self.client_id);
803        let venue = self.venue();
804        let start = request.start;
805        let end = request.end;
806        let params = request.params.clone();
807        let clock = self.clock;
808        let start_nanos = datetime_to_unix_nanos(start);
809        let end_nanos = datetime_to_unix_nanos(end);
810
811        get_runtime().spawn(async move {
812            match http.request_instruments(None, None, None).await {
813                Ok(instruments) => {
814                    tracing::info!("Fetched {} instruments from dYdX", instruments.len());
815
816                    // Cache all instruments
817                    for instrument in &instruments {
818                        upsert_instrument(&instruments_cache, instrument.clone());
819                    }
820
821                    let response = DataResponse::Instruments(InstrumentsResponse::new(
822                        request_id,
823                        client_id,
824                        venue,
825                        instruments,
826                        start_nanos,
827                        end_nanos,
828                        clock.get_time_ns(),
829                        params,
830                    ));
831
832                    if let Err(e) = sender.send(DataEvent::Response(response)) {
833                        tracing::error!("Failed to send instruments response: {e}");
834                    }
835                }
836                Err(e) => {
837                    tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
838
839                    // Send empty response on error
840                    let response = DataResponse::Instruments(InstrumentsResponse::new(
841                        request_id,
842                        client_id,
843                        venue,
844                        Vec::new(),
845                        start_nanos,
846                        end_nanos,
847                        clock.get_time_ns(),
848                        params,
849                    ));
850
851                    if let Err(e) = sender.send(DataEvent::Response(response)) {
852                        tracing::error!("Failed to send empty instruments response: {e}");
853                    }
854                }
855            }
856        });
857
858        Ok(())
859    }
860
861    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
862        use nautilus_model::{
863            data::TradeTick,
864            enums::{AggressorSide, OrderSide},
865            identifiers::TradeId,
866        };
867
868        let http = self.http_client.clone();
869        let instruments = self.instruments.clone();
870        let sender = self.data_sender.clone();
871        let instrument_id = request.instrument_id;
872        let start = request.start;
873        let end = request.end;
874        let limit = request.limit.map(|n| n.get() as u32);
875        let request_id = request.request_id;
876        let client_id = request.client_id.unwrap_or(self.client_id);
877        let params = request.params.clone();
878        let clock = self.clock;
879        let start_nanos = datetime_to_unix_nanos(start);
880        let end_nanos = datetime_to_unix_nanos(end);
881
882        get_runtime().spawn(async move {
883            // dYdX Indexer trades endpoint supports `limit` but not an explicit
884            // date range in this client; we approximate by using the provided
885            // limit and instrument metadata for precision.
886            let ticker = instrument_id
887                .symbol
888                .as_str()
889                .trim_end_matches("-PERP")
890                .to_string();
891
892            // Look up instrument to derive price and size precision.
893            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
894                Some(inst) => inst.clone(),
895                None => {
896                    tracing::error!(
897                        "request_trades: instrument {} not found in cache; cannot convert trades",
898                        instrument_id
899                    );
900                    let ts_now = clock.get_time_ns();
901                    let response = DataResponse::Trades(TradesResponse::new(
902                        request_id,
903                        client_id,
904                        instrument_id,
905                        Vec::new(),
906                        start_nanos,
907                        end_nanos,
908                        ts_now,
909                        params,
910                    ));
911                    if let Err(e) = sender.send(DataEvent::Response(response)) {
912                        tracing::error!("Failed to send empty trades response: {e}");
913                    }
914                    return;
915                }
916            };
917
918            let price_precision = instrument.price_precision();
919            let size_precision = instrument.size_precision();
920
921            match http
922                .inner
923                .get_trades(&ticker, limit)
924                .await
925                .context("failed to request trades from dYdX")
926            {
927                Ok(trades_response) => {
928                    let mut ticks = Vec::new();
929
930                    for trade in trades_response.trades {
931                        let aggressor_side = match trade.side {
932                            OrderSide::Buy => AggressorSide::Buyer,
933                            OrderSide::Sell => AggressorSide::Seller,
934                            _ => continue, // Skip unsupported side
935                        };
936
937                        let price = match Price::from_decimal_dp(trade.price, price_precision) {
938                            Ok(p) => p,
939                            Err(e) => {
940                                tracing::warn!(
941                                    "request_trades: failed to convert price for trade {}: {e}",
942                                    trade.id
943                                );
944                                continue;
945                            }
946                        };
947
948                        let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
949                            Ok(q) => q,
950                            Err(e) => {
951                                tracing::warn!(
952                                    "request_trades: failed to convert size for trade {}: {e}",
953                                    trade.id
954                                );
955                                continue;
956                            }
957                        };
958
959                        let ts_event = match trade.created_at.timestamp_nanos_opt() {
960                            Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
961                            _ => {
962                                tracing::warn!(
963                                    "request_trades: timestamp out of range for trade {}",
964                                    trade.id
965                                );
966                                continue;
967                            }
968                        };
969
970                        // Apply optional time-range filter.
971                        if let Some(start_ts) = start_nanos
972                            && ts_event < start_ts
973                        {
974                            continue;
975                        }
976                        if let Some(end_ts) = end_nanos
977                            && ts_event > end_ts
978                        {
979                            continue;
980                        }
981
982                        let tick = TradeTick::new(
983                            instrument_id,
984                            price,
985                            size,
986                            aggressor_side,
987                            TradeId::new(&trade.id),
988                            ts_event,
989                            clock.get_time_ns(),
990                        );
991                        ticks.push(tick);
992                    }
993
994                    let response = DataResponse::Trades(TradesResponse::new(
995                        request_id,
996                        client_id,
997                        instrument_id,
998                        ticks,
999                        start_nanos,
1000                        end_nanos,
1001                        clock.get_time_ns(),
1002                        params,
1003                    ));
1004
1005                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1006                        tracing::error!("Failed to send trades response: {e}");
1007                    }
1008                }
1009                Err(e) => {
1010                    tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1011
1012                    let response = DataResponse::Trades(TradesResponse::new(
1013                        request_id,
1014                        client_id,
1015                        instrument_id,
1016                        Vec::new(),
1017                        start_nanos,
1018                        end_nanos,
1019                        clock.get_time_ns(),
1020                        params,
1021                    ));
1022
1023                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1024                        tracing::error!("Failed to send empty trades response: {e}");
1025                    }
1026                }
1027            }
1028        });
1029
1030        Ok(())
1031    }
1032
1033    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1034        use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1035
1036        const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1037
1038        let bar_type = request.bar_type;
1039        let spec = bar_type.spec();
1040
1041        // Validate bar type requirements
1042        if bar_type.aggregation_source() != AggregationSource::External {
1043            anyhow::bail!(
1044                "dYdX only supports EXTERNAL aggregation, got {:?}",
1045                bar_type.aggregation_source()
1046            );
1047        }
1048
1049        if spec.price_type != PriceType::Last {
1050            anyhow::bail!(
1051                "dYdX only supports LAST price type, got {:?}",
1052                spec.price_type
1053            );
1054        }
1055
1056        // Map BarType spec to dYdX resolution
1057        let resolution = match spec.step.get() {
1058            1 => match spec.aggregation {
1059                BarAggregation::Minute => "1MIN",
1060                BarAggregation::Hour => "1HOUR",
1061                BarAggregation::Day => "1DAY",
1062                _ => {
1063                    anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1064                }
1065            },
1066            5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1067            15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1068            30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1069            4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1070            step => {
1071                anyhow::bail!("Unsupported bar step: {step}");
1072            }
1073        };
1074
1075        let http = self.http_client.clone();
1076        let instruments = self.instruments.clone();
1077        let sender = self.data_sender.clone();
1078        let instrument_id = bar_type.instrument_id();
1079        // dYdX ticker does not include the "-PERP" suffix.
1080        let symbol = instrument_id
1081            .symbol
1082            .as_str()
1083            .trim_end_matches("-PERP")
1084            .to_string();
1085        let request_id = request.request_id;
1086        let client_id = request.client_id.unwrap_or(self.client_id);
1087        let params = request.params.clone();
1088        let clock = self.clock;
1089
1090        let start = request.start;
1091        let end = request.end;
1092        let overall_limit = request.limit.map(|n| n.get() as u32);
1093
1094        // Convert optional datetimes to UnixNanos for response metadata
1095        let start_nanos = datetime_to_unix_nanos(start);
1096        let end_nanos = datetime_to_unix_nanos(end);
1097
1098        // Parse resolution string to DydxCandleResolution enum
1099        let resolution_enum = match resolution {
1100            "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1101            "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1102            "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1103            "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1104            "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1105            "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1106            "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1107            _ => {
1108                anyhow::bail!("Unsupported resolution: {resolution}");
1109            }
1110        };
1111
1112        get_runtime().spawn(async move {
1113            // Determine bar duration in seconds.
1114            let bar_secs: i64 = match spec.aggregation {
1115                BarAggregation::Minute => spec.step.get() as i64 * 60,
1116                BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1117                BarAggregation::Day => spec.step.get() as i64 * 86_400,
1118                _ => {
1119                    tracing::error!(
1120                        "Unsupported aggregation for request_bars: {:?}",
1121                        spec.aggregation
1122                    );
1123                    return;
1124                }
1125            };
1126
1127            // Look up instrument to derive price and size precision.
1128            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1129                Some(inst) => inst.clone(),
1130                None => {
1131                    tracing::error!(
1132                        "request_bars: instrument {} not found in cache; cannot convert candles",
1133                        instrument_id
1134                    );
1135                    let ts_now = clock.get_time_ns();
1136                    let response = DataResponse::Bars(BarsResponse::new(
1137                        request_id,
1138                        client_id,
1139                        bar_type,
1140                        Vec::new(),
1141                        start_nanos,
1142                        end_nanos,
1143                        ts_now,
1144                        params,
1145                    ));
1146                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1147                        tracing::error!("Failed to send empty bars response: {e}");
1148                    }
1149                    return;
1150                }
1151            };
1152
1153            let price_precision = instrument.price_precision();
1154            let size_precision = instrument.size_precision();
1155
1156            let mut all_bars: Vec<Bar> = Vec::new();
1157
1158            // If no explicit date range, fall back to a single request using only `limit`.
1159            let (range_start, range_end) = match (start, end) {
1160                (Some(s), Some(e)) if e > s => (s, e),
1161                _ => {
1162                    let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1163                    match http
1164                        .inner
1165                        .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1166                        .await
1167                    {
1168                        Ok(candles_response) => {
1169                            tracing::debug!(
1170                                "request_bars fetched {} candles without explicit date range",
1171                                candles_response.candles.len()
1172                            );
1173
1174                            for candle in &candles_response.candles {
1175                                match Self::candle_to_bar(
1176                                    candle,
1177                                    bar_type,
1178                                    price_precision,
1179                                    size_precision,
1180                                    bar_secs,
1181                                    clock,
1182                                ) {
1183                                    Ok(bar) => all_bars.push(bar),
1184                                    Err(e) => {
1185                                        tracing::warn!(
1186                                            "Failed to convert dYdX candle to bar for {}: {e}",
1187                                            instrument_id
1188                                        );
1189                                    }
1190                                }
1191                            }
1192
1193                            let current_time_ns = clock.get_time_ns();
1194                            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1195
1196                            let response = DataResponse::Bars(BarsResponse::new(
1197                                request_id,
1198                                client_id,
1199                                bar_type,
1200                                all_bars,
1201                                start_nanos,
1202                                end_nanos,
1203                                current_time_ns,
1204                                params,
1205                            ));
1206
1207                            if let Err(e) = sender.send(DataEvent::Response(response)) {
1208                                tracing::error!("Failed to send bars response: {e}");
1209                            }
1210                        }
1211                        Err(e) => {
1212                            tracing::error!(
1213                                "Failed to request candles for {symbol} without date range: {e:?}"
1214                            );
1215                        }
1216                    }
1217                    return;
1218                }
1219            };
1220
1221            // Calculate expected bars for the range.
1222            let total_secs = (range_end - range_start).num_seconds().max(0);
1223            let expected_bars = (total_secs / bar_secs).max(1) as u64;
1224
1225            tracing::debug!(
1226                "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1227                range_start,
1228                range_end,
1229                expected_bars
1230            );
1231
1232            let mut remaining = overall_limit.unwrap_or(u32::MAX);
1233
1234            // Determine chunk duration using max bars per request.
1235            let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1236            let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1237
1238            let mut chunk_start = range_start;
1239
1240            while chunk_start < range_end && remaining > 0 {
1241                let mut chunk_end = chunk_start + chunk_duration;
1242                if chunk_end > range_end {
1243                    chunk_end = range_end;
1244                }
1245
1246                let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1247
1248                tracing::debug!(
1249                    "request_bars chunk: {} -> {}, limit={}",
1250                    chunk_start,
1251                    chunk_end,
1252                    per_call_limit
1253                );
1254
1255                match http
1256                    .inner
1257                    .get_candles(
1258                        &symbol,
1259                        resolution_enum,
1260                        Some(per_call_limit),
1261                        Some(chunk_start),
1262                        Some(chunk_end),
1263                    )
1264                    .await
1265                {
1266                    Ok(candles_response) => {
1267                        let count = candles_response.candles.len() as u32;
1268
1269                        if count == 0 {
1270                            // No more data available; stop early.
1271                            break;
1272                        }
1273
1274                        // Convert candles to bars and accumulate.
1275                        for candle in &candles_response.candles {
1276                            match Self::candle_to_bar(
1277                                candle,
1278                                bar_type,
1279                                price_precision,
1280                                size_precision,
1281                                bar_secs,
1282                                clock,
1283                            ) {
1284                                Ok(bar) => all_bars.push(bar),
1285                                Err(e) => {
1286                                    tracing::warn!(
1287                                        "Failed to convert dYdX candle to bar for {}: {e}",
1288                                        instrument_id
1289                                    );
1290                                }
1291                            }
1292                        }
1293
1294                        if remaining <= count {
1295                            break;
1296                        } else {
1297                            remaining -= count;
1298                        }
1299                    }
1300                    Err(e) => {
1301                        tracing::error!(
1302                            "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1303                            chunk_start,
1304                            chunk_end
1305                        );
1306                        break;
1307                    }
1308                }
1309
1310                chunk_start += chunk_duration;
1311            }
1312
1313            tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1314
1315            // Filter incomplete bars: only return bars where ts_event < current_time_ns
1316            let current_time_ns = clock.get_time_ns();
1317            all_bars.retain(|bar| bar.ts_event < current_time_ns);
1318
1319            tracing::debug!(
1320                "request_bars filtered to {} completed bars (current_time_ns={})",
1321                all_bars.len(),
1322                current_time_ns
1323            );
1324
1325            let response = DataResponse::Bars(BarsResponse::new(
1326                request_id,
1327                client_id,
1328                bar_type,
1329                all_bars,
1330                start_nanos,
1331                end_nanos,
1332                current_time_ns,
1333                params,
1334            ));
1335
1336            if let Err(e) = sender.send(DataEvent::Response(response)) {
1337                tracing::error!("Failed to send bars response: {e}");
1338            }
1339        });
1340
1341        Ok(())
1342    }
1343}
1344
1345/// Upserts an instrument into the shared cache.
1346fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1347    let symbol = Ustr::from(instrument.id().symbol.as_str());
1348    cache.insert(symbol, instrument);
1349}
1350
1351impl DydxDataClient {
1352    /// Start a task to periodically refresh instruments.
1353    ///
1354    /// This task runs in the background and updates the instrument cache
1355    /// at the configured interval.
1356    ///
1357    /// # Errors
1358    ///
1359    /// Returns an error if a refresh task is already running.
1360    pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1361        let interval_secs = match self.config.instrument_refresh_interval_secs {
1362            Some(secs) if secs > 0 => secs,
1363            _ => {
1364                tracing::info!("Instrument refresh disabled (interval not configured)");
1365                return Ok(());
1366            }
1367        };
1368
1369        let interval = Duration::from_secs(interval_secs);
1370        let http_client = self.http_client.clone();
1371        let instruments_cache = self.instruments.clone();
1372        let ws_client = self.ws_client.clone();
1373        let cancellation_token = self.cancellation_token.clone();
1374
1375        tracing::info!(
1376            "Starting instrument refresh task (interval: {}s)",
1377            interval_secs
1378        );
1379
1380        let task = get_runtime().spawn(async move {
1381            let mut interval_timer = tokio::time::interval(interval);
1382            interval_timer.tick().await; // Skip first immediate tick
1383
1384            loop {
1385                tokio::select! {
1386                    _ = cancellation_token.cancelled() => {
1387                        tracing::info!("Instrument refresh task cancelled");
1388                        break;
1389                    }
1390                    _ = interval_timer.tick() => {
1391                        tracing::debug!("Refreshing instruments");
1392
1393                        // Populates all HTTP cache layers (instruments, clob_pair_id, market_params)
1394                        match http_client.fetch_and_cache_instruments().await {
1395                            Ok(()) => {
1396                                let instruments: Vec<_> = http_client
1397                                    .instruments()
1398                                    .iter()
1399                                    .map(|entry| entry.value().clone())
1400                                    .collect();
1401
1402                                tracing::debug!("Refreshed {} instruments", instruments.len());
1403
1404                                for instrument in &instruments {
1405                                    upsert_instrument(&instruments_cache, instrument.clone());
1406                                }
1407
1408                                // Propagate to WS handler for message parsing
1409                                if let Some(ref ws) = ws_client {
1410                                    ws.cache_instruments(instruments);
1411                                }
1412                            }
1413                            Err(e) => {
1414                                tracing::error!("Failed to refresh instruments: {}", e);
1415                            }
1416                        }
1417                    }
1418                }
1419            }
1420        });
1421
1422        self.tasks.push(task);
1423        Ok(())
1424    }
1425
1426    /// Start a background task to periodically refresh orderbook snapshots.
1427    ///
1428    /// This prevents stale orderbooks from missed WebSocket messages due to:
1429    /// - Network issues or message drops
1430    /// - dYdX validator delays
1431    /// - WebSocket reconnection gaps
1432    ///
1433    /// The task fetches fresh snapshots via HTTP at the configured interval
1434    /// and applies them to the local orderbooks.
1435    fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1436        let interval_secs = match self.config.orderbook_refresh_interval_secs {
1437            Some(secs) if secs > 0 => secs,
1438            _ => {
1439                tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1440                return Ok(());
1441            }
1442        };
1443
1444        let interval = Duration::from_secs(interval_secs);
1445        let http_client = self.http_client.clone();
1446        let instruments = self.instruments.clone();
1447        let order_books = self.order_books.clone();
1448        let active_subs = self.active_orderbook_subs.clone();
1449        let cancellation_token = self.cancellation_token.clone();
1450        let data_sender = self.data_sender.clone();
1451
1452        tracing::info!(
1453            "Starting orderbook snapshot refresh task (interval: {}s)",
1454            interval_secs
1455        );
1456
1457        let task = get_runtime().spawn(async move {
1458            let mut interval_timer = tokio::time::interval(interval);
1459            interval_timer.tick().await; // Skip first immediate tick
1460
1461            loop {
1462                tokio::select! {
1463                    _ = cancellation_token.cancelled() => {
1464                        tracing::info!("Orderbook refresh task cancelled");
1465                        break;
1466                    }
1467                    _ = interval_timer.tick() => {
1468                        let active_instruments: Vec<InstrumentId> = active_subs
1469                            .iter()
1470                            .map(|entry| *entry.key())
1471                            .collect();
1472
1473                        if active_instruments.is_empty() {
1474                            tracing::debug!("No active orderbook subscriptions to refresh");
1475                            continue;
1476                        }
1477
1478                        tracing::debug!(
1479                            "Refreshing {} orderbook snapshots",
1480                            active_instruments.len()
1481                        );
1482
1483                        for instrument_id in active_instruments {
1484                            // Get instrument for parsing
1485                            let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1486                                Some(inst) => inst.clone(),
1487                                None => {
1488                                    tracing::warn!(
1489                                        "Cannot refresh orderbook: no instrument for {}",
1490                                        instrument_id
1491                                    );
1492                                    continue;
1493                                }
1494                            };
1495
1496                            // Fetch snapshot via HTTP (strip -PERP suffix for dYdX API)
1497                            let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1498                            let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1499
1500                            let snapshot = match snapshot_result {
1501                                Ok(s) => s,
1502                                Err(e) => {
1503                                    tracing::error!(
1504                                        "Failed to fetch orderbook snapshot for {}: {}",
1505                                        instrument_id,
1506                                        e
1507                                    );
1508                                    continue;
1509                                }
1510                            };
1511
1512                            // Convert HTTP snapshot to OrderBookDeltas
1513                            let deltas_result = Self::parse_orderbook_snapshot(
1514                                instrument_id,
1515                                &snapshot,
1516                                &instrument,
1517                            );
1518
1519                            let deltas = match deltas_result {
1520                                Ok(d) => d,
1521                                Err(e) => {
1522                                    tracing::error!(
1523                                        "Failed to parse orderbook snapshot for {}: {}",
1524                                        instrument_id,
1525                                        e
1526                                    );
1527                                    continue;
1528                                }
1529                            };
1530
1531                            // Apply snapshot to local orderbook
1532                            if let Some(mut book) = order_books.get_mut(&instrument_id) {
1533                                if let Err(e) = book.apply_deltas(&deltas) {
1534                                    tracing::error!(
1535                                        "Failed to apply orderbook snapshot for {}: {}",
1536                                        instrument_id,
1537                                        e
1538                                    );
1539                                    continue;
1540                                }
1541
1542                                tracing::debug!(
1543                                    "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1544                                    instrument_id,
1545                                    book.best_bid_price(),
1546                                    book.best_ask_price()
1547                                );
1548                            }
1549
1550                            // Emit the snapshot deltas
1551                            let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1552                            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1553                                tracing::error!("Failed to emit orderbook snapshot: {}", e);
1554                            }
1555                        }
1556                    }
1557                }
1558            }
1559        });
1560
1561        self.tasks.push(task);
1562        Ok(())
1563    }
1564
1565    /// Parse HTTP orderbook snapshot into OrderBookDeltas.
1566    ///
1567    /// Converts the REST API orderbook format into Nautilus deltas with CLEAR + ADD actions.
1568    fn parse_orderbook_snapshot(
1569        instrument_id: InstrumentId,
1570        snapshot: &crate::http::models::OrderbookResponse,
1571        instrument: &InstrumentAny,
1572    ) -> anyhow::Result<OrderBookDeltas> {
1573        use nautilus_model::{
1574            data::{BookOrder, OrderBookDelta},
1575            enums::{BookAction, OrderSide, RecordFlag},
1576            instruments::Instrument,
1577            types::{Price, Quantity},
1578        };
1579
1580        let ts_init = get_atomic_clock_realtime().get_time_ns();
1581        let mut deltas = Vec::new();
1582
1583        // Add clear delta first
1584        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1585
1586        let price_precision = instrument.price_precision();
1587        let size_precision = instrument.size_precision();
1588
1589        let bids_len = snapshot.bids.len();
1590        let asks_len = snapshot.asks.len();
1591
1592        // Add bid levels
1593        for (idx, bid) in snapshot.bids.iter().enumerate() {
1594            let is_last = idx == bids_len - 1 && asks_len == 0;
1595            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1596
1597            let price = Price::from_decimal_dp(bid.price, price_precision)
1598                .context("failed to parse bid price")?;
1599            let size = Quantity::from_decimal_dp(bid.size, size_precision)
1600                .context("failed to parse bid size")?;
1601
1602            let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1603            deltas.push(OrderBookDelta::new(
1604                instrument_id,
1605                BookAction::Add,
1606                order,
1607                flags,
1608                0,
1609                ts_init,
1610                ts_init,
1611            ));
1612        }
1613
1614        // Add ask levels
1615        for (idx, ask) in snapshot.asks.iter().enumerate() {
1616            let is_last = idx == asks_len - 1;
1617            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1618
1619            let price = Price::from_decimal_dp(ask.price, price_precision)
1620                .context("failed to parse ask price")?;
1621            let size = Quantity::from_decimal_dp(ask.size, size_precision)
1622                .context("failed to parse ask size")?;
1623
1624            let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1625            deltas.push(OrderBookDelta::new(
1626                instrument_id,
1627                BookAction::Add,
1628                order,
1629                flags,
1630                0,
1631                ts_init,
1632                ts_init,
1633            ));
1634        }
1635
1636        Ok(OrderBookDeltas::new(instrument_id, deltas))
1637    }
1638
1639    /// Get a cached instrument by symbol.
1640    #[must_use]
1641    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1642        self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1643    }
1644
1645    /// Get all cached instruments.
1646    #[must_use]
1647    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1648        self.instruments.iter().map(|i| i.clone()).collect()
1649    }
1650
1651    fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1652        self.order_books
1653            .entry(instrument_id)
1654            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1655    }
1656
1657    /// Get BarType for a given WebSocket candle topic.
1658    #[must_use]
1659    pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1660        self.bar_type_mappings
1661            .get(topic)
1662            .map(|entry| *entry.value())
1663    }
1664
1665    /// Get all registered bar topics.
1666    #[must_use]
1667    pub fn get_bar_topics(&self) -> Vec<String> {
1668        self.bar_type_mappings
1669            .iter()
1670            .map(|entry| entry.key().clone())
1671            .collect()
1672    }
1673
1674    /// Convert a dYdX HTTP candle into a Nautilus [`Bar`].
1675    ///
1676    /// This mirrors the conversion logic used in other adapters (for example
1677    /// Hyperliquid), using the instrument price/size precision and mapping the
1678    /// candle start time to `ts_init` with `ts_event` at the end of the bar
1679    /// interval.
1680    fn candle_to_bar(
1681        candle: &crate::http::models::Candle,
1682        bar_type: BarType,
1683        price_precision: u8,
1684        size_precision: u8,
1685        bar_secs: i64,
1686        clock: &AtomicTime,
1687    ) -> anyhow::Result<Bar> {
1688        use anyhow::Context;
1689
1690        // Convert candle start time to UnixNanos (ts_init).
1691        let ts_init =
1692            datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1693
1694        // Treat ts_event as the end of the bar interval.
1695        let ts_event_ns = ts_init
1696            .as_u64()
1697            .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1698        let ts_event = UnixNanos::from(ts_event_ns);
1699
1700        let open = Price::from_decimal_dp(candle.open, price_precision)
1701            .context("failed to parse candle open price")?;
1702        let high = Price::from_decimal_dp(candle.high, price_precision)
1703            .context("failed to parse candle high price")?;
1704        let low = Price::from_decimal_dp(candle.low, price_precision)
1705            .context("failed to parse candle low price")?;
1706        let close = Price::from_decimal_dp(candle.close, price_precision)
1707            .context("failed to parse candle close price")?;
1708
1709        // Use base token volume as bar volume.
1710        let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1711            .context("failed to parse candle base_token_volume")?;
1712
1713        Ok(Bar::new(
1714            bar_type, open, high, low, close, volume, ts_event, ts_init,
1715        ))
1716    }
1717
1718    fn handle_ws_message(
1719        message: crate::websocket::enums::NautilusWsMessage,
1720        ctx: &WsMessageContext,
1721    ) {
1722        match message {
1723            crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1724                Self::handle_data_message(payloads, ctx.data_sender, ctx.incomplete_bars);
1725            }
1726            crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1727                Self::handle_deltas_message(
1728                    *deltas,
1729                    ctx.data_sender,
1730                    ctx.order_books,
1731                    ctx.last_quotes,
1732                    ctx.instruments,
1733                );
1734            }
1735            crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1736                Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1737            }
1738            crate::websocket::enums::NautilusWsMessage::Error(err) => {
1739                tracing::error!("dYdX WS error: {err}");
1740            }
1741            crate::websocket::enums::NautilusWsMessage::Reconnected => {
1742                tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1743
1744                // Re-subscribe to all active subscriptions after WebSocket reconnection
1745                if let Some(ws) = ctx.ws_client {
1746                    let total_subs = ctx.active_orderbook_subs.len()
1747                        + ctx.active_trade_subs.len()
1748                        + ctx.active_bar_subs.len();
1749
1750                    if total_subs == 0 {
1751                        tracing::debug!("No active subscriptions to restore");
1752                        return;
1753                    }
1754
1755                    tracing::info!(
1756                        "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1757                        total_subs,
1758                        ctx.active_orderbook_subs.len(),
1759                        ctx.active_trade_subs.len(),
1760                        ctx.active_bar_subs.len()
1761                    );
1762
1763                    // Re-subscribe to orderbook channels
1764                    for entry in ctx.active_orderbook_subs.iter() {
1765                        let instrument_id = *entry.key();
1766                        let ws_clone = ws.clone();
1767                        get_runtime().spawn(async move {
1768                            if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1769                                tracing::error!(
1770                                    "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1771                                );
1772                            } else {
1773                                tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1774                            }
1775                        });
1776                    }
1777
1778                    // Re-subscribe to trade channels
1779                    for entry in ctx.active_trade_subs.iter() {
1780                        let instrument_id = *entry.key();
1781                        let ws_clone = ws.clone();
1782                        get_runtime().spawn(async move {
1783                            if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1784                                tracing::error!(
1785                                    "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1786                                );
1787                            } else {
1788                                tracing::debug!("Re-subscribed to trades for {instrument_id}");
1789                            }
1790                        });
1791                    }
1792
1793                    // Re-subscribe to candle/bar channels
1794                    for entry in ctx.active_bar_subs.iter() {
1795                        let (instrument_id, resolution) = entry.key();
1796                        let instrument_id = *instrument_id;
1797                        let resolution = resolution.clone();
1798                        let bar_type = *entry.value();
1799                        let ws_clone = ws.clone();
1800
1801                        // Re-register bar type with handler
1802                        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1803                        let topic = format!("{ticker}/{resolution}");
1804                        if let Err(e) = ws.send_command(
1805                            crate::websocket::handler::HandlerCommand::RegisterBarType {
1806                                topic,
1807                                bar_type,
1808                            },
1809                        ) {
1810                            tracing::warn!(
1811                                "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1812                            );
1813                        }
1814
1815                        get_runtime().spawn(async move {
1816                            if let Err(e) =
1817                                ws_clone.subscribe_candles(instrument_id, &resolution).await
1818                            {
1819                                tracing::error!(
1820                                    "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1821                                );
1822                            } else {
1823                                tracing::debug!(
1824                                    "Re-subscribed to candles for {instrument_id} ({resolution})"
1825                                );
1826                            }
1827                        });
1828                    }
1829
1830                    tracing::info!("Completed re-subscription requests after reconnection");
1831                } else {
1832                    tracing::warn!("WebSocket client not available for re-subscription");
1833                }
1834            }
1835            crate::websocket::enums::NautilusWsMessage::BlockHeight(_) => {
1836                tracing::debug!(
1837                    "Ignoring block height message on dYdX data client (handled by execution adapter)"
1838                );
1839            }
1840            crate::websocket::enums::NautilusWsMessage::Order(_)
1841            | crate::websocket::enums::NautilusWsMessage::Fill(_)
1842            | crate::websocket::enums::NautilusWsMessage::Position(_)
1843            | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1844            | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1845            | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1846                tracing::debug!(
1847                    "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1848                );
1849            }
1850        }
1851    }
1852
1853    fn handle_data_message(
1854        payloads: Vec<NautilusData>,
1855        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1856        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1857    ) {
1858        for data in payloads {
1859            // Filter bars through incomplete bars cache
1860            if let NautilusData::Bar(bar) = data {
1861                Self::handle_bar_message(bar, data_sender, incomplete_bars);
1862            } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1863                tracing::error!("Failed to emit data event: {e}");
1864            }
1865        }
1866    }
1867
1868    /// Handles bar messages by tracking incomplete bars and only emitting completed ones.
1869    ///
1870    /// WebSocket candle updates arrive continuously. This method:
1871    /// - Caches bars where ts_event > current_time (incomplete)
1872    /// - Emits bars where ts_event <= current_time (complete)
1873    /// - Updates cached incomplete bars with latest data
1874    fn handle_bar_message(
1875        bar: Bar,
1876        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1877        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1878    ) {
1879        let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1880        let bar_type = bar.bar_type;
1881
1882        if bar.ts_event <= current_time_ns {
1883            // Bar is complete - emit it and remove from incomplete cache
1884            incomplete_bars.remove(&bar_type);
1885            if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1886                tracing::error!("Failed to emit completed bar: {e}");
1887            }
1888        } else {
1889            // Bar is incomplete - cache it (updates existing entry)
1890            tracing::trace!(
1891                "Caching incomplete bar for {} (ts_event={}, current={})",
1892                bar_type,
1893                bar.ts_event,
1894                current_time_ns
1895            );
1896            incomplete_bars.insert(bar_type, bar);
1897        }
1898    }
1899
1900    /// Resolves a crossed order book by generating synthetic deltas to uncross it.
1901    ///
1902    /// dYdX order books can become crossed due to:
1903    /// - Validator delays in order acknowledgment across the network
1904    /// - Missed or delayed WebSocket messages from the venue
1905    ///
1906    /// This function detects when bid_price >= ask_price and iteratively removes
1907    /// the smaller side while adjusting the larger side until the book is uncrossed.
1908    ///
1909    /// # Algorithm
1910    ///
1911    /// For each crossed level:
1912    /// - If bid_size > ask_size: DELETE ask, UPDATE bid (reduce by ask_size)
1913    /// - If bid_size < ask_size: DELETE bid, UPDATE ask (reduce by bid_size)
1914    /// - If bid_size == ask_size: DELETE both bid and ask
1915    ///
1916    /// The algorithm continues until no more crosses exist or the book is empty.
1917    fn resolve_crossed_order_book(
1918        book: &mut OrderBook,
1919        venue_deltas: OrderBookDeltas,
1920        instrument: &InstrumentAny,
1921    ) -> anyhow::Result<OrderBookDeltas> {
1922        let instrument_id = venue_deltas.instrument_id;
1923        let ts_init = venue_deltas.ts_init;
1924        let mut all_deltas = venue_deltas.deltas.clone();
1925
1926        // Apply the original venue deltas first
1927        book.apply_deltas(&venue_deltas)?;
1928
1929        // Check if orderbook is crossed
1930        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1931            (book.best_bid_price(), book.best_ask_price())
1932        {
1933            bid_price >= ask_price
1934        } else {
1935            false
1936        };
1937
1938        // Iteratively uncross the orderbook
1939        while is_crossed {
1940            tracing::debug!(
1941                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1942                instrument_id,
1943                book.best_bid_price(),
1944                book.best_ask_price()
1945            );
1946
1947            let bid_price = match book.best_bid_price() {
1948                Some(p) => p,
1949                None => break,
1950            };
1951            let ask_price = match book.best_ask_price() {
1952                Some(p) => p,
1953                None => break,
1954            };
1955            let bid_size = match book.best_bid_size() {
1956                Some(s) => s,
1957                None => break,
1958            };
1959            let ask_size = match book.best_ask_size() {
1960                Some(s) => s,
1961                None => break,
1962            };
1963
1964            let mut temp_deltas = Vec::new();
1965
1966            if bid_size > ask_size {
1967                // Remove ask level, reduce bid level
1968                let new_bid_size = Quantity::new(
1969                    bid_size.as_f64() - ask_size.as_f64(),
1970                    instrument.size_precision(),
1971                );
1972                temp_deltas.push(OrderBookDelta::new(
1973                    instrument_id,
1974                    BookAction::Update,
1975                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1976                    0,
1977                    0,
1978                    ts_init,
1979                    ts_init,
1980                ));
1981                temp_deltas.push(OrderBookDelta::new(
1982                    instrument_id,
1983                    BookAction::Delete,
1984                    BookOrder::new(
1985                        OrderSide::Sell,
1986                        ask_price,
1987                        Quantity::new(0.0, instrument.size_precision()),
1988                        0,
1989                    ),
1990                    0,
1991                    0,
1992                    ts_init,
1993                    ts_init,
1994                ));
1995            } else if bid_size < ask_size {
1996                // Remove bid level, reduce ask level
1997                let new_ask_size = Quantity::new(
1998                    ask_size.as_f64() - bid_size.as_f64(),
1999                    instrument.size_precision(),
2000                );
2001                temp_deltas.push(OrderBookDelta::new(
2002                    instrument_id,
2003                    BookAction::Update,
2004                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
2005                    0,
2006                    0,
2007                    ts_init,
2008                    ts_init,
2009                ));
2010                temp_deltas.push(OrderBookDelta::new(
2011                    instrument_id,
2012                    BookAction::Delete,
2013                    BookOrder::new(
2014                        OrderSide::Buy,
2015                        bid_price,
2016                        Quantity::new(0.0, instrument.size_precision()),
2017                        0,
2018                    ),
2019                    0,
2020                    0,
2021                    ts_init,
2022                    ts_init,
2023                ));
2024            } else {
2025                // Equal sizes: remove both levels
2026                temp_deltas.push(OrderBookDelta::new(
2027                    instrument_id,
2028                    BookAction::Delete,
2029                    BookOrder::new(
2030                        OrderSide::Buy,
2031                        bid_price,
2032                        Quantity::new(0.0, instrument.size_precision()),
2033                        0,
2034                    ),
2035                    0,
2036                    0,
2037                    ts_init,
2038                    ts_init,
2039                ));
2040                temp_deltas.push(OrderBookDelta::new(
2041                    instrument_id,
2042                    BookAction::Delete,
2043                    BookOrder::new(
2044                        OrderSide::Sell,
2045                        ask_price,
2046                        Quantity::new(0.0, instrument.size_precision()),
2047                        0,
2048                    ),
2049                    0,
2050                    0,
2051                    ts_init,
2052                    ts_init,
2053                ));
2054            }
2055
2056            // Apply temporary deltas to the book
2057            let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2058            book.apply_deltas(&temp_deltas_obj)?;
2059            all_deltas.extend(temp_deltas);
2060
2061            // Check if still crossed
2062            is_crossed = if let (Some(bid_price), Some(ask_price)) =
2063                (book.best_bid_price(), book.best_ask_price())
2064            {
2065                bid_price >= ask_price
2066            } else {
2067                false
2068            };
2069        }
2070
2071        // Set F_LAST flag on the final delta
2072        if let Some(last_delta) = all_deltas.last_mut() {
2073            last_delta.flags = RecordFlag::F_LAST as u8;
2074        }
2075
2076        Ok(OrderBookDeltas::new(instrument_id, all_deltas))
2077    }
2078
2079    fn handle_deltas_message(
2080        deltas: OrderBookDeltas,
2081        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2082        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2083        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2084        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2085    ) {
2086        let instrument_id = deltas.instrument_id;
2087
2088        // Get instrument for crossed orderbook resolution
2089        let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2090            Some(inst) => inst.clone(),
2091            None => {
2092                tracing::error!(
2093                    "Cannot resolve crossed order book: no instrument for {instrument_id}"
2094                );
2095                // Still emit the raw deltas even without instrument
2096                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2097                    OrderBookDeltas_API::new(deltas),
2098                ))) {
2099                    tracing::error!("Failed to emit order book deltas: {e}");
2100                }
2101                return;
2102            }
2103        };
2104
2105        // Get or create order book
2106        let mut book = order_books
2107            .entry(instrument_id)
2108            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2109
2110        // Resolve crossed orderbook (applies deltas internally)
2111        let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2112        {
2113            Ok(d) => d,
2114            Err(e) => {
2115                tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2116                return;
2117            }
2118        };
2119
2120        // Generate QuoteTick from updated top-of-book
2121        // Edge case: If orderbook is empty after deltas, fall back to last quote
2122        let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2123            (book.best_bid_price(), book.best_ask_price())
2124            && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2125        {
2126            Some(QuoteTick::new(
2127                instrument_id,
2128                bid_price,
2129                ask_price,
2130                bid_size,
2131                ask_size,
2132                resolved_deltas.ts_event,
2133                resolved_deltas.ts_init,
2134            ))
2135        } else {
2136            // Edge case: Empty orderbook levels - use last quote as fallback
2137            if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2138                tracing::debug!(
2139                    "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2140                );
2141                last_quotes.get(&instrument_id).map(|q| *q)
2142            } else {
2143                None
2144            }
2145        };
2146
2147        if let Some(quote) = quote_opt {
2148            // Only emit when top-of-book changes
2149            let emit_quote =
2150                !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2151
2152            if emit_quote {
2153                last_quotes.insert(instrument_id, quote);
2154                if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2155                    tracing::error!("Failed to emit quote tick: {e}");
2156                }
2157            }
2158        } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2159            // Partial orderbook (only one side) - log but don't emit
2160            tracing::debug!(
2161                "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2162                book.best_bid_price(),
2163                book.best_ask_price()
2164            );
2165        }
2166
2167        // Emit the resolved order book deltas
2168        let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2169        if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2170            tracing::error!("Failed to emit order book deltas event: {e}");
2171        }
2172    }
2173
2174    fn handle_oracle_prices(
2175        oracle_prices: std::collections::HashMap<
2176            String,
2177            crate::websocket::messages::DydxOraclePriceMarket,
2178        >,
2179        instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2180        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2181    ) {
2182        let ts_init = get_atomic_clock_realtime().get_time_ns();
2183
2184        for (symbol_str, oracle_market) in oracle_prices {
2185            let symbol = Ustr::from(&symbol_str);
2186
2187            // Get instrument to access instrument_id
2188            let Some(instrument) = instruments.get(&symbol) else {
2189                tracing::debug!(
2190                    symbol = %symbol,
2191                    "Received oracle price for unknown instrument (not cached yet)"
2192                );
2193                continue;
2194            };
2195
2196            let instrument_id = instrument.id();
2197
2198            // Parse oracle price string to Price
2199            let oracle_price_str = &oracle_market.oracle_price;
2200            let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2201                tracing::error!(
2202                    symbol = %symbol,
2203                    price_str = %oracle_price_str,
2204                    "Failed to parse oracle price as f64"
2205                );
2206                continue;
2207            };
2208
2209            let price_precision = instrument.price_precision();
2210            let oracle_price = Price::from_raw(
2211                (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2212                price_precision,
2213            );
2214
2215            let oracle_price_event = DydxOraclePrice::new(
2216                instrument_id,
2217                oracle_price,
2218                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2219                ts_init,
2220            );
2221
2222            tracing::debug!(
2223                instrument_id = %instrument_id,
2224                oracle_price = %oracle_price,
2225                "Received dYdX oracle price: {oracle_price_event:?}"
2226            );
2227
2228            let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2229                instrument_id,
2230                oracle_price,
2231                ts_init, // Use ts_init as ts_event since dYdX doesn't provide event timestamp
2232                ts_init,
2233            ));
2234
2235            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2236                tracing::error!("Failed to emit oracle price: {e}");
2237            }
2238        }
2239    }
2240}
2241
2242#[cfg(test)]
2243mod tests {
2244    use std::{collections::HashMap, net::SocketAddr, time::Duration};
2245
2246    use axum::{
2247        Router,
2248        extract::{Path, Query, State},
2249        response::Json,
2250        routing::get,
2251    };
2252    use chrono::Utc;
2253    use indexmap::IndexMap;
2254    use nautilus_common::{
2255        live::runner::set_data_event_sender,
2256        messages::{DataEvent, data::DataResponse},
2257        testing::wait_until_async,
2258    };
2259    use nautilus_core::UUID4;
2260    use nautilus_model::{
2261        data::{
2262            BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2263            TradeTick, order::BookOrder,
2264        },
2265        enums::{
2266            AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2267            PriceType,
2268        },
2269        identifiers::{ClientId, InstrumentId, Symbol, Venue},
2270        instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2271        orderbook::OrderBook,
2272        types::{Currency, Price, Quantity},
2273    };
2274    use rstest::rstest;
2275    use rust_decimal::Decimal;
2276    use rust_decimal_macros::dec;
2277    use tokio::net::{TcpListener, TcpStream};
2278
2279    use super::*;
2280    use crate::http::models::{Candle, CandlesResponse};
2281
2282    fn setup_test_env() {
2283        // Initialize data event sender for tests
2284        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2285        set_data_event_sender(sender);
2286    }
2287
2288    async fn wait_for_server(addr: SocketAddr) {
2289        wait_until_async(
2290            || async move { TcpStream::connect(addr).await.is_ok() },
2291            Duration::from_secs(5),
2292        )
2293        .await;
2294    }
2295
2296    #[rstest]
2297    fn test_new_data_client() {
2298        setup_test_env();
2299
2300        let client_id = ClientId::from("DYDX-001");
2301        let config = DydxDataClientConfig::default();
2302        let http_client = DydxHttpClient::default();
2303
2304        let client = DydxDataClient::new(client_id, config, http_client, None);
2305        assert!(client.is_ok());
2306
2307        let client = client.unwrap();
2308        assert_eq!(client.client_id(), client_id);
2309        assert_eq!(client.venue(), *DYDX_VENUE);
2310        assert!(!client.is_connected());
2311    }
2312
2313    #[tokio::test]
2314    async fn test_data_client_lifecycle() {
2315        setup_test_env();
2316
2317        let client_id = ClientId::from("DYDX-001");
2318        let config = DydxDataClientConfig::default();
2319        let http_client = DydxHttpClient::default();
2320
2321        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2322
2323        // Test start
2324        assert!(client.start().is_ok());
2325
2326        // Test stop
2327        assert!(client.stop().is_ok());
2328        assert!(!client.is_connected());
2329
2330        // Test reset
2331        assert!(client.reset().is_ok());
2332
2333        // Test dispose
2334        assert!(client.dispose().is_ok());
2335    }
2336
2337    #[rstest]
2338    fn test_subscribe_unsubscribe_instruments_noop() {
2339        setup_test_env();
2340
2341        let client_id = ClientId::from("DYDX-TEST");
2342        let config = DydxDataClientConfig::default();
2343        let http_client = DydxHttpClient::default();
2344
2345        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2346
2347        let venue = *DYDX_VENUE;
2348        let command_id = UUID4::new();
2349        let ts_init = get_atomic_clock_realtime().get_time_ns();
2350
2351        let subscribe = SubscribeInstruments {
2352            client_id: Some(client_id),
2353            venue,
2354            command_id,
2355            ts_init,
2356            params: None,
2357        };
2358        let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2359
2360        // No-op methods should succeed even without a WebSocket client.
2361        assert!(client.subscribe_instruments(&subscribe).is_ok());
2362        assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2363    }
2364
2365    #[rstest]
2366    fn test_bar_type_mappings_registration() {
2367        setup_test_env();
2368
2369        let client_id = ClientId::from("DYDX-TEST");
2370        let config = DydxDataClientConfig::default();
2371        let http_client = DydxHttpClient::default();
2372
2373        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2374
2375        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2376        let spec = BarSpecification {
2377            step: std::num::NonZeroUsize::new(1).unwrap(),
2378            aggregation: BarAggregation::Minute,
2379            price_type: PriceType::Last,
2380        };
2381        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2382
2383        // Initially no topics registered
2384        assert!(client.get_bar_topics().is_empty());
2385        assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2386
2387        // Register topic
2388        client
2389            .bar_type_mappings
2390            .insert("BTC-USD/1MIN".to_string(), bar_type);
2391
2392        // Verify registration
2393        assert_eq!(client.get_bar_topics().len(), 1);
2394        assert!(
2395            client
2396                .get_bar_topics()
2397                .contains(&"BTC-USD/1MIN".to_string())
2398        );
2399        assert_eq!(
2400            client.get_bar_type_for_topic("BTC-USD/1MIN"),
2401            Some(bar_type)
2402        );
2403
2404        // Register another topic
2405        let spec_5min = BarSpecification {
2406            step: std::num::NonZeroUsize::new(5).unwrap(),
2407            aggregation: BarAggregation::Minute,
2408            price_type: PriceType::Last,
2409        };
2410        let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2411        client
2412            .bar_type_mappings
2413            .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2414
2415        // Verify multiple topics
2416        assert_eq!(client.get_bar_topics().len(), 2);
2417        assert_eq!(
2418            client.get_bar_type_for_topic("BTC-USD/5MINS"),
2419            Some(bar_type_5min)
2420        );
2421    }
2422
2423    #[rstest]
2424    fn test_bar_type_mappings_unregistration() {
2425        setup_test_env();
2426
2427        let client_id = ClientId::from("DYDX-TEST");
2428        let config = DydxDataClientConfig::default();
2429        let http_client = DydxHttpClient::default();
2430
2431        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2432
2433        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2434        let spec = BarSpecification {
2435            step: std::num::NonZeroUsize::new(1).unwrap(),
2436            aggregation: BarAggregation::Hour,
2437            price_type: PriceType::Last,
2438        };
2439        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2440
2441        // Register topic
2442        client
2443            .bar_type_mappings
2444            .insert("ETH-USD/1HOUR".to_string(), bar_type);
2445        assert_eq!(client.get_bar_topics().len(), 1);
2446
2447        // Unregister topic
2448        client.bar_type_mappings.remove("ETH-USD/1HOUR");
2449
2450        // Verify unregistration
2451        assert!(client.get_bar_topics().is_empty());
2452        assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2453    }
2454
2455    #[rstest]
2456    fn test_bar_type_mappings_lookup_nonexistent() {
2457        setup_test_env();
2458
2459        let client_id = ClientId::from("DYDX-TEST");
2460        let config = DydxDataClientConfig::default();
2461        let http_client = DydxHttpClient::default();
2462
2463        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2464
2465        // Lookup non-existent topic
2466        assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2467        assert!(client.get_bar_topics().is_empty());
2468    }
2469
2470    #[tokio::test]
2471    async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2472        setup_test_env();
2473
2474        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2475        let instruments = Arc::new(DashMap::new());
2476        let order_books = Arc::new(DashMap::new());
2477        let last_quotes = Arc::new(DashMap::new());
2478        let ws_client: Option<DydxWebSocketClient> = None;
2479        let active_orderbook_subs = Arc::new(DashMap::new());
2480        let active_trade_subs = Arc::new(DashMap::new());
2481        let active_bar_subs = Arc::new(DashMap::new());
2482
2483        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2484        let bar_ts = get_atomic_clock_realtime().get_time_ns();
2485
2486        // Add a test instrument to the cache (required for crossed book resolution)
2487        use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2488        let symbol = Symbol::from("BTC-USD-PERP");
2489        let instrument = CryptoPerpetual::new(
2490            instrument_id,
2491            symbol,
2492            Currency::BTC(),
2493            Currency::USD(),
2494            Currency::USD(),
2495            false,
2496            2,
2497            4,
2498            Price::from("0.01"),
2499            Quantity::from("0.0001"),
2500            None,
2501            None,
2502            None,
2503            None,
2504            None,
2505            None,
2506            None,
2507            None,
2508            None,
2509            None,
2510            None,
2511            None,
2512            bar_ts,
2513            bar_ts,
2514        );
2515        instruments.insert(
2516            Ustr::from("BTC-USD-PERP"),
2517            InstrumentAny::CryptoPerpetual(instrument),
2518        );
2519
2520        let price = Price::from("100.00");
2521        let size = Quantity::from("1.0");
2522
2523        // Create both bid and ask deltas to generate a quote
2524        let bid_delta = OrderBookDelta::new(
2525            instrument_id,
2526            BookAction::Add,
2527            BookOrder::new(OrderSide::Buy, price, size, 1),
2528            0,
2529            1,
2530            bar_ts,
2531            bar_ts,
2532        );
2533        let ask_delta = OrderBookDelta::new(
2534            instrument_id,
2535            BookAction::Add,
2536            BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2537            0,
2538            1,
2539            bar_ts,
2540            bar_ts,
2541        );
2542        let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2543
2544        let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2545
2546        let incomplete_bars = Arc::new(DashMap::new());
2547        let ctx = WsMessageContext {
2548            data_sender: &sender,
2549            instruments: &instruments,
2550            order_books: &order_books,
2551            last_quotes: &last_quotes,
2552            ws_client: &ws_client,
2553            active_orderbook_subs: &active_orderbook_subs,
2554            active_trade_subs: &active_trade_subs,
2555            active_bar_subs: &active_bar_subs,
2556            incomplete_bars: &incomplete_bars,
2557        };
2558        DydxDataClient::handle_ws_message(message, &ctx);
2559
2560        // Ensure order book was created and top-of-book quote cached.
2561        assert!(order_books.get(&instrument_id).is_some());
2562        assert!(last_quotes.get(&instrument_id).is_some());
2563
2564        // Ensure a quote and deltas Data events were emitted.
2565        let mut saw_quote = false;
2566        let mut saw_deltas = false;
2567
2568        while let Ok(event) = rx.try_recv() {
2569            if let DataEvent::Data(data) = event {
2570                match data {
2571                    NautilusData::Quote(_) => saw_quote = true,
2572                    NautilusData::Deltas(_) => saw_deltas = true,
2573                    _ => {}
2574                }
2575            }
2576        }
2577
2578        assert!(saw_quote);
2579        assert!(saw_deltas);
2580    }
2581
2582    #[rstest]
2583    fn test_handle_ws_message_error_does_not_panic() {
2584        // Ensure malformed/error WebSocket messages are logged and ignored
2585        // without panicking or affecting client state.
2586        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2587        let instruments = Arc::new(DashMap::new());
2588        let order_books = Arc::new(DashMap::new());
2589        let last_quotes = Arc::new(DashMap::new());
2590        let ws_client: Option<DydxWebSocketClient> = None;
2591        let active_orderbook_subs = Arc::new(DashMap::new());
2592        let active_trade_subs = Arc::new(DashMap::new());
2593        let active_bar_subs = Arc::new(DashMap::new());
2594        let incomplete_bars = Arc::new(DashMap::new());
2595
2596        let ctx = WsMessageContext {
2597            data_sender: &sender,
2598            instruments: &instruments,
2599            order_books: &order_books,
2600            last_quotes: &last_quotes,
2601            ws_client: &ws_client,
2602            active_orderbook_subs: &active_orderbook_subs,
2603            active_trade_subs: &active_trade_subs,
2604            active_bar_subs: &active_bar_subs,
2605            incomplete_bars: &incomplete_bars,
2606        };
2607
2608        let err = crate::websocket::error::DydxWebSocketError::from_message(
2609            "malformed WebSocket payload".to_string(),
2610        );
2611
2612        DydxDataClient::handle_ws_message(
2613            crate::websocket::enums::NautilusWsMessage::Error(err),
2614            &ctx,
2615        );
2616    }
2617
2618    #[tokio::test]
2619    async fn test_request_bars_partitioning_math_does_not_panic() {
2620        setup_test_env();
2621
2622        let client_id = ClientId::from("DYDX-BARS");
2623        let config = DydxDataClientConfig::default();
2624        let http_client = DydxHttpClient::default();
2625
2626        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2627
2628        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2629        let spec = BarSpecification {
2630            step: std::num::NonZeroUsize::new(1).unwrap(),
2631            aggregation: BarAggregation::Minute,
2632            price_type: PriceType::Last,
2633        };
2634        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2635
2636        let now = Utc::now();
2637        let start = Some(now - chrono::Duration::hours(10));
2638        let end = Some(now);
2639
2640        let request = RequestBars::new(
2641            bar_type,
2642            start,
2643            end,
2644            None,
2645            Some(client_id),
2646            UUID4::new(),
2647            get_atomic_clock_realtime().get_time_ns(),
2648            None,
2649        );
2650
2651        // We only verify that the partitioning logic executes without panicking;
2652        // HTTP calls are allowed to fail and are handled internally.
2653        assert!(client.request_bars(&request).is_ok());
2654    }
2655
2656    #[tokio::test]
2657    async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2658        setup_test_env();
2659
2660        // Prepare a simple candles response served by a local Axum HTTP server.
2661        let now = Utc::now();
2662        let candle = crate::http::models::Candle {
2663            started_at: now - chrono::Duration::minutes(1),
2664            ticker: "BTC-USD".to_string(),
2665            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2666            open: dec!(100.0),
2667            high: dec!(101.0),
2668            low: dec!(99.0),
2669            close: dec!(100.5),
2670            base_token_volume: dec!(1.0),
2671            usd_volume: dec!(100.0),
2672            trades: 10,
2673            starting_open_interest: dec!(1000.0),
2674        };
2675        let candles_response = crate::http::models::CandlesResponse {
2676            candles: vec![candle],
2677        };
2678        let state = CandlesTestState {
2679            response: Arc::new(candles_response),
2680        };
2681        let addr = start_candles_test_server(state).await;
2682        let base_url = format!("http://{addr}");
2683
2684        let client_id = ClientId::from("DYDX-BARS-MONTHS");
2685        let config = DydxDataClientConfig {
2686            base_url_http: Some(base_url),
2687            is_testnet: true,
2688            ..Default::default()
2689        };
2690
2691        let http_client = DydxHttpClient::new(
2692            config.base_url_http.clone(),
2693            config.http_timeout_secs,
2694            config.http_proxy_url.clone(),
2695            config.is_testnet,
2696            None,
2697        )
2698        .unwrap();
2699
2700        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2701
2702        // Seed instrument cache so request_bars can resolve precision.
2703        let instrument = create_test_instrument_any();
2704        let instrument_id = instrument.id();
2705        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2706        client.instruments.insert(symbol_key, instrument);
2707
2708        let spec = BarSpecification {
2709            step: std::num::NonZeroUsize::new(1).unwrap(),
2710            aggregation: BarAggregation::Minute,
2711            price_type: PriceType::Last,
2712        };
2713        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2714
2715        // Use a date range spanning multiple months to exercise partitioning math.
2716        let start = Some(now - chrono::Duration::days(90));
2717        let end = Some(now);
2718
2719        // Limit the total number of bars so the test completes quickly.
2720        let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2721
2722        let request = RequestBars::new(
2723            bar_type,
2724            start,
2725            end,
2726            limit,
2727            Some(client_id),
2728            UUID4::new(),
2729            get_atomic_clock_realtime().get_time_ns(),
2730            None,
2731        );
2732
2733        assert!(client.request_bars(&request).is_ok());
2734    }
2735
2736    #[derive(Clone)]
2737    struct OrderbookTestState {
2738        snapshot: Arc<crate::http::models::OrderbookResponse>,
2739    }
2740
2741    #[derive(Clone)]
2742    struct TradesTestState {
2743        response: Arc<crate::http::models::TradesResponse>,
2744        last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2745        last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2746    }
2747
2748    #[derive(Clone)]
2749    struct CandlesTestState {
2750        response: Arc<crate::http::models::CandlesResponse>,
2751    }
2752
2753    async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2754        async fn handle_orderbook(
2755            Path(_ticker): Path<String>,
2756            State(state): State<OrderbookTestState>,
2757        ) -> Json<crate::http::models::OrderbookResponse> {
2758            Json((*state.snapshot).clone())
2759        }
2760
2761        let router = Router::new().route(
2762            "/v4/orderbooks/perpetualMarket/{ticker}",
2763            get(handle_orderbook).with_state(state),
2764        );
2765
2766        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2767        let addr = listener.local_addr().unwrap();
2768
2769        get_runtime().spawn(async move {
2770            axum::serve(listener, router.into_make_service())
2771                .await
2772                .unwrap();
2773        });
2774
2775        wait_for_server(addr).await;
2776        addr
2777    }
2778
2779    async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2780        async fn handle_trades(
2781            Path(ticker): Path<String>,
2782            Query(params): Query<HashMap<String, String>>,
2783            State(state): State<TradesTestState>,
2784        ) -> Json<crate::http::models::TradesResponse> {
2785            {
2786                let mut last_ticker = state.last_ticker.lock().await;
2787                *last_ticker = Some(ticker);
2788            }
2789
2790            let limit = params
2791                .get("limit")
2792                .and_then(|value| value.parse::<u32>().ok());
2793            {
2794                let mut last_limit = state.last_limit.lock().await;
2795                *last_limit = Some(limit);
2796            }
2797
2798            Json((*state.response).clone())
2799        }
2800
2801        let router = Router::new().route(
2802            "/v4/trades/perpetualMarket/{ticker}",
2803            get(handle_trades).with_state(state),
2804        );
2805
2806        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2807        let addr = listener.local_addr().unwrap();
2808
2809        get_runtime().spawn(async move {
2810            axum::serve(listener, router.into_make_service())
2811                .await
2812                .unwrap();
2813        });
2814
2815        wait_for_server(addr).await;
2816        addr
2817    }
2818
2819    async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2820        async fn handle_candles(
2821            Path(_ticker): Path<String>,
2822            Query(_params): Query<HashMap<String, String>>,
2823            State(state): State<CandlesTestState>,
2824        ) -> Json<crate::http::models::CandlesResponse> {
2825            Json((*state.response).clone())
2826        }
2827
2828        let router = Router::new().route(
2829            "/v4/candles/perpetualMarkets/{ticker}",
2830            get(handle_candles).with_state(state),
2831        );
2832
2833        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2834        let addr = listener.local_addr().unwrap();
2835
2836        get_runtime().spawn(async move {
2837            axum::serve(listener, router.into_make_service())
2838                .await
2839                .unwrap();
2840        });
2841
2842        wait_for_server(addr).await;
2843        addr
2844    }
2845
2846    fn create_test_instrument_any() -> InstrumentAny {
2847        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2848
2849        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2850            instrument_id,
2851            instrument_id.symbol,
2852            Currency::BTC(),
2853            Currency::USD(),
2854            Currency::USD(),
2855            false,
2856            2,                                // price_precision
2857            8,                                // size_precision
2858            Price::new(0.01, 2),              // price_increment
2859            Quantity::new(0.001, 8),          // size_increment
2860            Some(Quantity::new(1.0, 0)),      // multiplier
2861            Some(Quantity::new(0.001, 8)),    // lot_size
2862            Some(Quantity::new(100000.0, 8)), // max_quantity
2863            Some(Quantity::new(0.001, 8)),    // min_quantity
2864            None,                             // max_notional
2865            None,                             // min_notional
2866            Some(Price::new(1000000.0, 2)),   // max_price
2867            Some(Price::new(0.01, 2)),        // min_price
2868            Some(dec!(0.05)),                 // margin_init
2869            Some(dec!(0.03)),                 // margin_maint
2870            Some(dec!(0.0002)),               // maker_fee
2871            Some(dec!(0.0005)),               // taker_fee
2872            UnixNanos::default(),             // ts_event
2873            UnixNanos::default(),             // ts_init
2874        ))
2875    }
2876
2877    // ------------------------------------------------------------------------
2878    // Precision & bar conversion tests
2879    // ------------------------------------------------------------------------
2880
2881    #[tokio::test]
2882    async fn test_candle_to_bar_price_size_edge_cases() {
2883        setup_test_env();
2884
2885        let clock = get_atomic_clock_realtime();
2886        let now = Utc::now();
2887
2888        // Very large prices and sizes (edge cases).
2889        let candle = Candle {
2890            started_at: now,
2891            ticker: "BTC-USD".to_string(),
2892            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2893            open: dec!(123456789.123456),
2894            high: dec!(987654321.987654),  // high is max
2895            low: dec!(123456.789),         // low is min
2896            close: dec!(223456789.123456), // close between low and high
2897            base_token_volume: dec!(0.00000001),
2898            usd_volume: dec!(1234500.0),
2899            trades: 42,
2900            starting_open_interest: dec!(1000.0),
2901        };
2902
2903        let instrument = create_test_instrument_any();
2904        let instrument_id = instrument.id();
2905        let spec = BarSpecification {
2906            step: std::num::NonZeroUsize::new(1).unwrap(),
2907            aggregation: BarAggregation::Minute,
2908            price_type: PriceType::Last,
2909        };
2910        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2911
2912        let bar = DydxDataClient::candle_to_bar(
2913            &candle,
2914            bar_type,
2915            instrument.price_precision(),
2916            instrument.size_precision(),
2917            60,
2918            clock,
2919        )
2920        .expect("candle_to_bar should handle large/scientific values");
2921
2922        assert!(bar.open.as_f64() > 0.0);
2923        assert!(bar.high.as_f64() >= bar.low.as_f64());
2924        assert!(bar.volume.as_f64() > 0.0);
2925    }
2926
2927    #[tokio::test]
2928    async fn test_candle_to_bar_ts_event_overflow_safe() {
2929        setup_test_env();
2930
2931        let clock = get_atomic_clock_realtime();
2932        let now = Utc::now();
2933
2934        let candle = Candle {
2935            started_at: now,
2936            ticker: "BTC-USD".to_string(),
2937            resolution: crate::common::enums::DydxCandleResolution::OneDay,
2938            open: Decimal::from(1),
2939            high: Decimal::from(1),
2940            low: Decimal::from(1),
2941            close: Decimal::from(1),
2942            base_token_volume: Decimal::from(1),
2943            usd_volume: Decimal::from(1),
2944            trades: 1,
2945            starting_open_interest: Decimal::from(1),
2946        };
2947
2948        let instrument = create_test_instrument_any();
2949        let instrument_id = instrument.id();
2950        let spec = BarSpecification {
2951            step: std::num::NonZeroUsize::new(1).unwrap(),
2952            aggregation: BarAggregation::Day,
2953            price_type: PriceType::Last,
2954        };
2955        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2956
2957        // Use an intentionally large bar_secs to exercise saturating_add path.
2958        let bar_secs = i64::MAX / 1_000_000_000;
2959        let bar = DydxDataClient::candle_to_bar(
2960            &candle,
2961            bar_type,
2962            instrument.price_precision(),
2963            instrument.size_precision(),
2964            bar_secs,
2965            clock,
2966        )
2967        .expect("candle_to_bar should not overflow on ts_event");
2968
2969        assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2970    }
2971
2972    #[tokio::test]
2973    async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2974        // Simulate bars with ts_event both before and after current_time_ns and
2975        // ensure only completed bars (ts_event < now) are retained.
2976        let clock = get_atomic_clock_realtime();
2977        let now = Utc::now();
2978
2979        // Use a dedicated data channel for this test and register it
2980        // before constructing the data client.
2981        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2982        set_data_event_sender(sender);
2983
2984        // two candles: one in the past, one in the future
2985        let candle_past = Candle {
2986            started_at: now - chrono::Duration::minutes(2),
2987            ticker: "BTC-USD".to_string(),
2988            resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2989            open: Decimal::from(1),
2990            high: Decimal::from(2),
2991            low: Decimal::from(1),
2992            close: Decimal::from(1),
2993            base_token_volume: Decimal::from(1),
2994            usd_volume: Decimal::from(1),
2995            trades: 1,
2996            starting_open_interest: Decimal::from(1),
2997        };
2998        let candle_future = Candle {
2999            started_at: now + chrono::Duration::minutes(2),
3000            ..candle_past.clone()
3001        };
3002
3003        let candles_response = CandlesResponse {
3004            candles: vec![candle_past, candle_future],
3005        };
3006
3007        let state = CandlesTestState {
3008            response: Arc::new(candles_response),
3009        };
3010        let addr = start_candles_test_server(state).await;
3011        let base_url = format!("http://{addr}");
3012
3013        let client_id = ClientId::from("DYDX-BARS-SKEW");
3014        let config = DydxDataClientConfig {
3015            base_url_http: Some(base_url),
3016            is_testnet: true,
3017            ..Default::default()
3018        };
3019
3020        let http_client = DydxHttpClient::new(
3021            config.base_url_http.clone(),
3022            config.http_timeout_secs,
3023            config.http_proxy_url.clone(),
3024            config.is_testnet,
3025            None,
3026        )
3027        .unwrap();
3028
3029        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3030
3031        let instrument = create_test_instrument_any();
3032        let instrument_id = instrument.id();
3033        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3034        client.instruments.insert(symbol_key, instrument);
3035
3036        let spec = BarSpecification {
3037            step: std::num::NonZeroUsize::new(1).unwrap(),
3038            aggregation: BarAggregation::Minute,
3039            price_type: PriceType::Last,
3040        };
3041        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
3042
3043        let request = RequestBars::new(
3044            bar_type,
3045            Some(now - chrono::Duration::minutes(5)),
3046            Some(now + chrono::Duration::minutes(5)),
3047            None,
3048            Some(client_id),
3049            UUID4::new(),
3050            clock.get_time_ns(),
3051            None,
3052        );
3053
3054        assert!(client.request_bars(&request).is_ok());
3055
3056        let timeout = tokio::time::Duration::from_secs(3);
3057        if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
3058            tokio::time::timeout(timeout, rx.recv()).await
3059        {
3060            // Only the past candle should remain after filtering.
3061            assert_eq!(resp.data.len(), 1);
3062        }
3063    }
3064
3065    #[rstest]
3066    fn test_decimal_to_f64_precision_loss_within_tolerance() {
3067        // Verify converting via Price/Quantity preserves reasonable precision.
3068        let price_value = 12345.125_f64;
3069        let qty_value = 0.00012345_f64;
3070
3071        let price = Price::new(price_value, 6);
3072        let qty = Quantity::new(qty_value, 8);
3073
3074        let price_diff = (price.as_f64() - price_value).abs();
3075        let qty_diff = (qty.as_f64() - qty_value).abs();
3076
3077        // Differences should be well within a tiny epsilon.
3078        assert!(price_diff < 1e-10);
3079        assert!(qty_diff < 1e-12);
3080    }
3081
3082    #[tokio::test]
3083    async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
3084        // Set up a dedicated data event channel for this test.
3085        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3086        set_data_event_sender(sender);
3087
3088        // Prepare a static orderbook snapshot served by a local Axum HTTP server.
3089        let snapshot = crate::http::models::OrderbookResponse {
3090            bids: vec![crate::http::models::OrderbookLevel {
3091                price: dec!(100.0),
3092                size: dec!(1.0),
3093            }],
3094            asks: vec![crate::http::models::OrderbookLevel {
3095                price: dec!(101.0),
3096                size: dec!(2.0),
3097            }],
3098        };
3099        let state = OrderbookTestState {
3100            snapshot: Arc::new(snapshot),
3101        };
3102        let addr = start_orderbook_test_server(state).await;
3103        let base_url = format!("http://{addr}");
3104
3105        // Configure the data client with a short refresh interval and mock HTTP base URL.
3106        let client_id = ClientId::from("DYDX-REFRESH");
3107        let config = DydxDataClientConfig {
3108            is_testnet: true,
3109            base_url_http: Some(base_url),
3110            orderbook_refresh_interval_secs: Some(1),
3111            instrument_refresh_interval_secs: None,
3112            ..Default::default()
3113        };
3114
3115        let http_client = DydxHttpClient::new(
3116            config.base_url_http.clone(),
3117            config.http_timeout_secs,
3118            config.http_proxy_url.clone(),
3119            config.is_testnet,
3120            None,
3121        )
3122        .unwrap();
3123
3124        let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3125
3126        // Seed instruments and orderbook state for a single instrument.
3127        let instrument = create_test_instrument_any();
3128        let instrument_id = instrument.id();
3129        let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3130        client.instruments.insert(symbol_key, instrument);
3131        client.order_books.insert(
3132            instrument_id,
3133            OrderBook::new(instrument_id, BookType::L2_MBP),
3134        );
3135        client.active_orderbook_subs.insert(instrument_id, ());
3136
3137        // Start the refresh task and wait for a snapshot to be applied and emitted.
3138        client.start_orderbook_refresh_task().unwrap();
3139
3140        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3141        let mut saw_snapshot_event = false;
3142
3143        while std::time::Instant::now() < deadline {
3144            if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3145                tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3146            {
3147                saw_snapshot_event = true;
3148                break;
3149            }
3150        }
3151
3152        assert!(
3153            saw_snapshot_event,
3154            "expected at least one snapshot deltas event from refresh task"
3155        );
3156
3157        // Verify that the local orderbook has been updated with the snapshot.
3158        let book = client
3159            .order_books
3160            .get(&instrument_id)
3161            .expect("orderbook should exist after refresh");
3162        let best_bid = book.best_bid_price().expect("best bid should be set");
3163        let best_ask = book.best_ask_price().expect("best ask should be set");
3164
3165        assert_eq!(best_bid, Price::from("100.00"));
3166        assert_eq!(best_ask, Price::from("101.00"));
3167    }
3168
3169    #[rstest]
3170    fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3171        // Test scenario: bid_size > ask_size
3172        // Expected: DELETE ask, UPDATE bid (reduce by ask_size)
3173        let instrument = create_test_instrument_any();
3174        let instrument_id = instrument.id();
3175        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3176        let ts_init = get_atomic_clock_realtime().get_time_ns();
3177
3178        // Create initial non-crossed book
3179        let initial_deltas = vec![
3180            OrderBookDelta::new(
3181                instrument_id,
3182                BookAction::Add,
3183                BookOrder::new(
3184                    OrderSide::Buy,
3185                    Price::from("99.00"),
3186                    Quantity::from("1.0"),
3187                    0,
3188                ),
3189                0,
3190                0,
3191                ts_init,
3192                ts_init,
3193            ),
3194            OrderBookDelta::new(
3195                instrument_id,
3196                BookAction::Add,
3197                BookOrder::new(
3198                    OrderSide::Sell,
3199                    Price::from("101.00"),
3200                    Quantity::from("2.0"),
3201                    0,
3202                ),
3203                0,
3204                0,
3205                ts_init,
3206                ts_init,
3207            ),
3208        ];
3209        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3210            .unwrap();
3211
3212        // Create crossed orderbook: bid @ 102.00 (size 5.0) > ask @ 101.00 (size 2.0)
3213        let crossed_deltas = vec![OrderBookDelta::new(
3214            instrument_id,
3215            BookAction::Add,
3216            BookOrder::new(
3217                OrderSide::Buy,
3218                Price::from("102.00"),
3219                Quantity::from("5.0"),
3220                0,
3221            ),
3222            0,
3223            0,
3224            ts_init,
3225            ts_init,
3226        )];
3227        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3228
3229        let resolved =
3230            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3231                .unwrap();
3232
3233        // Verify resolution: ask @ 101.00 should be deleted
3234        // bid @ 102.00 should remain but reduced (note: precision affects exact value)
3235        assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3236        assert!(book.best_bid_size().unwrap().as_f64() < 5.0); // Reduced from original
3237        assert!(
3238            book.best_ask_price().is_none()
3239                || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3240        ); // No longer crossed
3241
3242        // Verify synthetic deltas were generated
3243        assert!(resolved.deltas.len() > 1); // Original delta + synthetic resolution deltas
3244    }
3245
3246    #[rstest]
3247    fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3248        // Test scenario: bid_size < ask_size
3249        // Expected: DELETE bid, UPDATE ask (reduce by bid_size)
3250        let instrument = create_test_instrument_any();
3251        let instrument_id = instrument.id();
3252        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3253        let ts_init = get_atomic_clock_realtime().get_time_ns();
3254
3255        // Create initial non-crossed book
3256        let initial_deltas = vec![
3257            OrderBookDelta::new(
3258                instrument_id,
3259                BookAction::Add,
3260                BookOrder::new(
3261                    OrderSide::Buy,
3262                    Price::from("99.00"),
3263                    Quantity::from("1.0"),
3264                    0,
3265                ),
3266                0,
3267                0,
3268                ts_init,
3269                ts_init,
3270            ),
3271            OrderBookDelta::new(
3272                instrument_id,
3273                BookAction::Add,
3274                BookOrder::new(
3275                    OrderSide::Sell,
3276                    Price::from("101.00"),
3277                    Quantity::from("5.0"),
3278                    0,
3279                ),
3280                0,
3281                0,
3282                ts_init,
3283                ts_init,
3284            ),
3285        ];
3286        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3287            .unwrap();
3288
3289        // Create crossed orderbook: bid @ 102.00 (size 2.0) < ask @ 101.00 (size 5.0)
3290        let crossed_deltas = vec![OrderBookDelta::new(
3291            instrument_id,
3292            BookAction::Add,
3293            BookOrder::new(
3294                OrderSide::Buy,
3295                Price::from("102.00"),
3296                Quantity::from("2.0"),
3297                0,
3298            ),
3299            0,
3300            0,
3301            ts_init,
3302            ts_init,
3303        )];
3304        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3305
3306        let resolved =
3307            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3308                .unwrap();
3309
3310        // Verify resolution: bid @ 102.00 should be deleted, ask @ 101.00 reduced
3311        assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3312        assert!(book.best_ask_size().unwrap().as_f64() < 5.0); // Reduced from original
3313        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level remains
3314        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); // No longer crossed
3315
3316        // Verify synthetic deltas were generated
3317        assert!(resolved.deltas.len() > 1);
3318    }
3319
3320    #[rstest]
3321    fn test_resolve_crossed_order_book_equal_sizes() {
3322        // Test scenario: bid_size == ask_size
3323        // Expected: DELETE both bid and ask
3324        let instrument = create_test_instrument_any();
3325        let instrument_id = instrument.id();
3326        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3327        let ts_init = get_atomic_clock_realtime().get_time_ns();
3328
3329        // Create initial non-crossed book with multiple levels
3330        let initial_deltas = vec![
3331            OrderBookDelta::new(
3332                instrument_id,
3333                BookAction::Add,
3334                BookOrder::new(
3335                    OrderSide::Buy,
3336                    Price::from("99.00"),
3337                    Quantity::from("1.0"),
3338                    0,
3339                ),
3340                0,
3341                0,
3342                ts_init,
3343                ts_init,
3344            ),
3345            OrderBookDelta::new(
3346                instrument_id,
3347                BookAction::Add,
3348                BookOrder::new(
3349                    OrderSide::Sell,
3350                    Price::from("101.00"),
3351                    Quantity::from("3.0"),
3352                    0,
3353                ),
3354                0,
3355                0,
3356                ts_init,
3357                ts_init,
3358            ),
3359        ];
3360        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3361            .unwrap();
3362
3363        // Create crossed orderbook: bid @ 102.00 (size 3.0) == ask @ 101.00 (size 3.0)
3364        let crossed_deltas = vec![OrderBookDelta::new(
3365            instrument_id,
3366            BookAction::Add,
3367            BookOrder::new(
3368                OrderSide::Buy,
3369                Price::from("102.00"),
3370                Quantity::from("3.0"),
3371                0,
3372            ),
3373            0,
3374            0,
3375            ts_init,
3376            ts_init,
3377        )];
3378        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3379
3380        let resolved =
3381            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3382                .unwrap();
3383
3384        // Verify resolution: both crossed levels should be deleted, reverting to deeper levels
3385        assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); // Next bid level
3386        // Ask at 101.00 should be deleted, book may be empty on ask side or have deeper levels
3387        if let Some(ask_price) = book.best_ask_price() {
3388            assert!(ask_price > book.best_bid_price().unwrap()); // No longer crossed
3389        }
3390
3391        // Verify synthetic deltas were generated
3392        assert!(resolved.deltas.len() > 1);
3393    }
3394
3395    #[rstest]
3396    fn test_resolve_crossed_order_book_multiple_iterations() {
3397        // Test scenario: multiple crossed levels requiring multiple iterations
3398        let instrument = create_test_instrument_any();
3399        let instrument_id = instrument.id();
3400        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3401        let ts_init = get_atomic_clock_realtime().get_time_ns();
3402
3403        // Create initial book with multiple levels on both sides
3404        let initial_deltas = vec![
3405            OrderBookDelta::new(
3406                instrument_id,
3407                BookAction::Add,
3408                BookOrder::new(
3409                    OrderSide::Buy,
3410                    Price::from("98.00"),
3411                    Quantity::from("1.0"),
3412                    0,
3413                ),
3414                0,
3415                0,
3416                ts_init,
3417                ts_init,
3418            ),
3419            OrderBookDelta::new(
3420                instrument_id,
3421                BookAction::Add,
3422                BookOrder::new(
3423                    OrderSide::Sell,
3424                    Price::from("100.00"),
3425                    Quantity::from("1.0"),
3426                    0,
3427                ),
3428                0,
3429                0,
3430                ts_init,
3431                ts_init,
3432            ),
3433            OrderBookDelta::new(
3434                instrument_id,
3435                BookAction::Add,
3436                BookOrder::new(
3437                    OrderSide::Sell,
3438                    Price::from("101.00"),
3439                    Quantity::from("1.0"),
3440                    0,
3441                ),
3442                0,
3443                0,
3444                ts_init,
3445                ts_init,
3446            ),
3447        ];
3448        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3449            .unwrap();
3450
3451        // Create heavily crossed orderbook with multiple bids above asks
3452        let crossed_deltas = vec![
3453            OrderBookDelta::new(
3454                instrument_id,
3455                BookAction::Add,
3456                BookOrder::new(
3457                    OrderSide::Buy,
3458                    Price::from("102.00"),
3459                    Quantity::from("1.0"),
3460                    0,
3461                ),
3462                0,
3463                0,
3464                ts_init,
3465                ts_init,
3466            ),
3467            OrderBookDelta::new(
3468                instrument_id,
3469                BookAction::Add,
3470                BookOrder::new(
3471                    OrderSide::Buy,
3472                    Price::from("103.00"),
3473                    Quantity::from("1.0"),
3474                    0,
3475                ),
3476                0,
3477                0,
3478                ts_init,
3479                ts_init,
3480            ),
3481        ];
3482        let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3483
3484        let resolved =
3485            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3486                .unwrap();
3487
3488        // Verify final state is uncrossed (or book has no asks left)
3489        if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3490            assert!(ask_price > bid_price, "Book should be uncrossed");
3491        }
3492
3493        // Verify multiple synthetic deltas were generated for multiple iterations
3494        assert!(resolved.deltas.len() > 2); // Original deltas + multiple resolution passes
3495    }
3496
3497    #[rstest]
3498    fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3499        // Test scenario: non-crossed orderbook should pass through unchanged
3500        let instrument = create_test_instrument_any();
3501        let instrument_id = instrument.id();
3502        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3503        let ts_init = get_atomic_clock_realtime().get_time_ns();
3504
3505        // Create normal non-crossed book
3506        let initial_deltas = vec![
3507            OrderBookDelta::new(
3508                instrument_id,
3509                BookAction::Add,
3510                BookOrder::new(
3511                    OrderSide::Buy,
3512                    Price::from("99.00"),
3513                    Quantity::from("1.0"),
3514                    0,
3515                ),
3516                0,
3517                0,
3518                ts_init,
3519                ts_init,
3520            ),
3521            OrderBookDelta::new(
3522                instrument_id,
3523                BookAction::Add,
3524                BookOrder::new(
3525                    OrderSide::Sell,
3526                    Price::from("101.00"),
3527                    Quantity::from("1.0"),
3528                    0,
3529                ),
3530                0,
3531                0,
3532                ts_init,
3533                ts_init,
3534            ),
3535        ];
3536        book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3537            .unwrap();
3538
3539        // Add another non-crossing level
3540        let new_deltas = vec![OrderBookDelta::new(
3541            instrument_id,
3542            BookAction::Add,
3543            BookOrder::new(
3544                OrderSide::Buy,
3545                Price::from("98.50"),
3546                Quantity::from("2.0"),
3547                0,
3548            ),
3549            0,
3550            0,
3551            ts_init,
3552            ts_init,
3553        )];
3554        let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3555
3556        let original_bid = book.best_bid_price();
3557        let original_ask = book.best_ask_price();
3558
3559        let resolved =
3560            DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3561                .unwrap();
3562
3563        // Verify no resolution was needed - deltas should be original only
3564        assert_eq!(resolved.deltas.len(), new_deltas.len());
3565        assert_eq!(book.best_bid_price(), original_bid);
3566        assert_eq!(book.best_ask_price(), original_ask);
3567        assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3568    }
3569
3570    #[tokio::test]
3571    async fn test_request_instruments_successful_fetch() {
3572        // Test successful fetch of all instruments
3573        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3574        set_data_event_sender(sender);
3575
3576        let client_id = ClientId::from("DYDX-TEST");
3577        let config = DydxDataClientConfig::default();
3578        let http_client = DydxHttpClient::default();
3579        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3580
3581        let request = RequestInstruments::new(
3582            None,
3583            None,
3584            Some(client_id),
3585            Some(*DYDX_VENUE),
3586            UUID4::new(),
3587            get_atomic_clock_realtime().get_time_ns(),
3588            None,
3589        );
3590
3591        // Execute request (spawns async task)
3592        assert!(client.request_instruments(&request).is_ok());
3593
3594        // Wait for response (with timeout)
3595        let timeout = tokio::time::Duration::from_secs(5);
3596        let result = tokio::time::timeout(timeout, rx.recv()).await;
3597
3598        match result {
3599            Ok(Some(DataEvent::Response(resp))) => {
3600                if let DataResponse::Instruments(inst_resp) = resp {
3601                    // Verify response structure
3602                    assert_eq!(inst_resp.correlation_id, request.request_id);
3603                    assert_eq!(inst_resp.client_id, client_id);
3604                    assert_eq!(inst_resp.venue, *DYDX_VENUE);
3605                    assert!(inst_resp.start.is_none());
3606                    assert!(inst_resp.end.is_none());
3607                    // Note: may be empty if HTTP fails, but structure should be correct
3608                }
3609            }
3610            Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3611            Ok(None) => panic!("Channel closed unexpectedly"),
3612            Err(_) => {
3613                // Timeout is acceptable if testnet is unreachable
3614                println!("Test timed out - testnet may be unreachable");
3615            }
3616        }
3617    }
3618
3619    #[tokio::test]
3620    async fn test_request_instruments_empty_response_on_http_error() {
3621        // Test empty response handling when HTTP call fails
3622        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3623        set_data_event_sender(sender);
3624
3625        let client_id = ClientId::from("DYDX-ERROR-TEST");
3626        let config = DydxDataClientConfig {
3627            base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3628            ..Default::default()
3629        };
3630        let http_client = DydxHttpClient::new(
3631            config.base_url_http.clone(),
3632            config.http_timeout_secs,
3633            config.http_proxy_url.clone(),
3634            config.is_testnet,
3635            None,
3636        )
3637        .unwrap();
3638
3639        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3640
3641        let request = RequestInstruments::new(
3642            None,
3643            None,
3644            Some(client_id),
3645            Some(*DYDX_VENUE),
3646            UUID4::new(),
3647            get_atomic_clock_realtime().get_time_ns(),
3648            None,
3649        );
3650
3651        assert!(client.request_instruments(&request).is_ok());
3652
3653        // Should receive empty response on error
3654        let timeout = tokio::time::Duration::from_secs(3);
3655        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3656            tokio::time::timeout(timeout, rx.recv()).await
3657        {
3658            assert!(
3659                resp.data.is_empty(),
3660                "Expected empty instruments on HTTP error"
3661            );
3662            assert_eq!(resp.correlation_id, request.request_id);
3663            assert_eq!(resp.client_id, client_id);
3664        }
3665    }
3666
3667    #[tokio::test]
3668    async fn test_request_instruments_caching() {
3669        // Test instrument caching after fetch
3670        setup_test_env();
3671
3672        let client_id = ClientId::from("DYDX-CACHE-TEST");
3673        let config = DydxDataClientConfig::default();
3674        let http_client = DydxHttpClient::default();
3675        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3676
3677        let initial_cache_size = client.instruments.len();
3678
3679        let request = RequestInstruments::new(
3680            None,
3681            None,
3682            Some(client_id),
3683            Some(*DYDX_VENUE),
3684            UUID4::new(),
3685            get_atomic_clock_realtime().get_time_ns(),
3686            None,
3687        );
3688
3689        assert!(client.request_instruments(&request).is_ok());
3690
3691        // Wait for async task to complete
3692        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3693
3694        // Verify cache populated (if HTTP succeeded)
3695        let final_cache_size = client.instruments.len();
3696        // Cache should be unchanged (empty) if HTTP failed, or populated if succeeded
3697        // We can't assert exact size without mocking, but can verify no panic
3698        assert!(final_cache_size >= initial_cache_size);
3699    }
3700
3701    #[tokio::test]
3702    async fn test_request_instruments_correlation_id_matching() {
3703        // Test correlation_id matching in response
3704        setup_test_env();
3705
3706        let client_id = ClientId::from("DYDX-CORR-TEST");
3707        let config = DydxDataClientConfig::default();
3708        let http_client = DydxHttpClient::default();
3709        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3710
3711        let request_id = UUID4::new();
3712        let request = RequestInstruments::new(
3713            None,
3714            None,
3715            Some(client_id),
3716            Some(*DYDX_VENUE),
3717            request_id,
3718            get_atomic_clock_realtime().get_time_ns(),
3719            None,
3720        );
3721
3722        // Should execute without panic (actual correlation checked in async handler)
3723        assert!(client.request_instruments(&request).is_ok());
3724    }
3725
3726    #[tokio::test]
3727    async fn test_request_instruments_venue_assignment() {
3728        // Test venue assignment
3729        setup_test_env();
3730
3731        let client_id = ClientId::from("DYDX-VENUE-TEST");
3732        let config = DydxDataClientConfig::default();
3733        let http_client = DydxHttpClient::default();
3734        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3735
3736        assert_eq!(client.venue(), *DYDX_VENUE);
3737
3738        let request = RequestInstruments::new(
3739            None,
3740            None,
3741            Some(client_id),
3742            Some(*DYDX_VENUE),
3743            UUID4::new(),
3744            get_atomic_clock_realtime().get_time_ns(),
3745            None,
3746        );
3747
3748        assert!(client.request_instruments(&request).is_ok());
3749    }
3750
3751    #[tokio::test]
3752    async fn test_request_instruments_timestamp_handling() {
3753        // Test timestamp handling (start_nanos, end_nanos)
3754        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3755        set_data_event_sender(sender);
3756
3757        let client_id = ClientId::from("DYDX-TS-TEST");
3758        let config = DydxDataClientConfig::default();
3759        let http_client = DydxHttpClient::default();
3760        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3761
3762        let now = Utc::now();
3763        let start = Some(now - chrono::Duration::hours(24));
3764        let end = Some(now);
3765
3766        let request = RequestInstruments::new(
3767            start,
3768            end,
3769            Some(client_id),
3770            Some(*DYDX_VENUE),
3771            UUID4::new(),
3772            get_atomic_clock_realtime().get_time_ns(),
3773            None,
3774        );
3775
3776        assert!(client.request_instruments(&request).is_ok());
3777
3778        // Wait for response
3779        let timeout = tokio::time::Duration::from_secs(3);
3780        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3781            tokio::time::timeout(timeout, rx.recv()).await
3782        {
3783            // Verify timestamps are set
3784            assert!(resp.start.unwrap() > 0);
3785            assert!(resp.end.unwrap() > 0);
3786            assert!(resp.start.unwrap() <= resp.end.unwrap());
3787            assert!(resp.ts_init > 0);
3788        }
3789    }
3790
3791    #[tokio::test]
3792    async fn test_request_instruments_with_start_only() {
3793        // Test timestamp handling when only `start` is provided
3794        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3795        set_data_event_sender(sender);
3796
3797        let client_id = ClientId::from("DYDX-TS-START-ONLY");
3798        let config = DydxDataClientConfig::default();
3799        let http_client = DydxHttpClient::default();
3800        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3801
3802        let now = Utc::now();
3803        let start = Some(now - chrono::Duration::hours(24));
3804
3805        let request = RequestInstruments::new(
3806            start,
3807            None,
3808            Some(client_id),
3809            Some(*DYDX_VENUE),
3810            UUID4::new(),
3811            get_atomic_clock_realtime().get_time_ns(),
3812            None,
3813        );
3814
3815        assert!(client.request_instruments(&request).is_ok());
3816
3817        let timeout = tokio::time::Duration::from_secs(3);
3818        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3819            tokio::time::timeout(timeout, rx.recv()).await
3820        {
3821            assert!(resp.start.is_some());
3822            assert!(resp.end.is_none());
3823            assert!(resp.ts_init > 0);
3824        }
3825    }
3826
3827    #[tokio::test]
3828    async fn test_request_instruments_with_end_only() {
3829        // Test timestamp handling when only `end` is provided
3830        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3831        set_data_event_sender(sender);
3832
3833        let client_id = ClientId::from("DYDX-TS-END-ONLY");
3834        let config = DydxDataClientConfig::default();
3835        let http_client = DydxHttpClient::default();
3836        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3837
3838        let now = Utc::now();
3839        let end = Some(now);
3840
3841        let request = RequestInstruments::new(
3842            None,
3843            end,
3844            Some(client_id),
3845            Some(*DYDX_VENUE),
3846            UUID4::new(),
3847            get_atomic_clock_realtime().get_time_ns(),
3848            None,
3849        );
3850
3851        assert!(client.request_instruments(&request).is_ok());
3852
3853        let timeout = tokio::time::Duration::from_secs(3);
3854        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3855            tokio::time::timeout(timeout, rx.recv()).await
3856        {
3857            assert!(resp.start.is_none());
3858            assert!(resp.end.is_some());
3859            assert!(resp.ts_init > 0);
3860        }
3861    }
3862
3863    #[tokio::test]
3864    async fn test_request_instruments_client_id_fallback() {
3865        // Test client_id fallback to default when not provided
3866        setup_test_env();
3867
3868        let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3869        let config = DydxDataClientConfig::default();
3870        let http_client = DydxHttpClient::default();
3871        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3872
3873        let request = RequestInstruments::new(
3874            None,
3875            None,
3876            None, // No client_id provided
3877            Some(*DYDX_VENUE),
3878            UUID4::new(),
3879            get_atomic_clock_realtime().get_time_ns(),
3880            None,
3881        );
3882
3883        // Should use client's default client_id
3884        assert!(client.request_instruments(&request).is_ok());
3885    }
3886
3887    #[tokio::test]
3888    async fn test_request_instruments_with_params() {
3889        // Test custom params handling
3890        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3891        set_data_event_sender(sender);
3892
3893        let client_id = ClientId::from("DYDX-PARAMS-TEST");
3894        let config = DydxDataClientConfig::default();
3895        let http_client = DydxHttpClient::default();
3896        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3897
3898        // Create params - just verify they're passed through
3899        let mut params_map = IndexMap::new();
3900        params_map.insert("test_key".to_string(), "test_value".to_string());
3901
3902        let request = RequestInstruments::new(
3903            None,
3904            None,
3905            Some(client_id),
3906            Some(*DYDX_VENUE),
3907            UUID4::new(),
3908            get_atomic_clock_realtime().get_time_ns(),
3909            Some(params_map),
3910        );
3911
3912        assert!(client.request_instruments(&request).is_ok());
3913
3914        // Wait for response
3915        let timeout = tokio::time::Duration::from_secs(3);
3916        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3917            tokio::time::timeout(timeout, rx.recv()).await
3918        {
3919            // Verify params are propagated into the response
3920            assert_eq!(resp.client_id, client_id);
3921            let params = resp
3922                .params
3923                .expect("expected params to be present in InstrumentsResponse");
3924            assert_eq!(
3925                params.get("test_key").map(String::as_str),
3926                Some("test_value")
3927            );
3928        }
3929    }
3930
3931    #[tokio::test]
3932    async fn test_request_instruments_with_start_and_end_range() {
3933        // Test timestamp handling when both start and end are provided
3934        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3935        set_data_event_sender(sender);
3936
3937        let client_id = ClientId::from("DYDX-START-END-RANGE");
3938        let config = DydxDataClientConfig::default();
3939        let http_client = DydxHttpClient::default();
3940        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3941
3942        let now = Utc::now();
3943        let start = Some(now - chrono::Duration::hours(48));
3944        let end = Some(now - chrono::Duration::hours(24));
3945
3946        let request = RequestInstruments::new(
3947            start,
3948            end,
3949            Some(client_id),
3950            Some(*DYDX_VENUE),
3951            UUID4::new(),
3952            get_atomic_clock_realtime().get_time_ns(),
3953            None,
3954        );
3955
3956        assert!(client.request_instruments(&request).is_ok());
3957
3958        let timeout = tokio::time::Duration::from_secs(3);
3959        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3960            tokio::time::timeout(timeout, rx.recv()).await
3961        {
3962            // Verify both timestamps are present
3963            assert!(
3964                resp.start.is_some(),
3965                "start timestamp should be present when provided"
3966            );
3967            assert!(
3968                resp.end.is_some(),
3969                "end timestamp should be present when provided"
3970            );
3971            assert!(resp.ts_init > 0, "ts_init should always be set");
3972
3973            // Verify start is before end
3974            if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3975                assert!(
3976                    start_ts < end_ts,
3977                    "start timestamp should be before end timestamp"
3978                );
3979            }
3980        }
3981    }
3982
3983    #[tokio::test]
3984    async fn test_request_instruments_different_client_ids() {
3985        // Test that different client_id values are properly handled using a shared channel.
3986        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3987        set_data_event_sender(sender);
3988
3989        let timeout = tokio::time::Duration::from_secs(3);
3990
3991        // First client
3992        let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3993        let config1 = DydxDataClientConfig::default();
3994        let http_client1 = DydxHttpClient::default();
3995        let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3996
3997        let request1 = RequestInstruments::new(
3998            None,
3999            None,
4000            Some(client_id_1),
4001            Some(*DYDX_VENUE),
4002            UUID4::new(),
4003            get_atomic_clock_realtime().get_time_ns(),
4004            None,
4005        );
4006
4007        assert!(client1.request_instruments(&request1).is_ok());
4008
4009        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
4010            tokio::time::timeout(timeout, rx.recv()).await
4011        {
4012            assert_eq!(
4013                resp1.client_id, client_id_1,
4014                "Response should contain client_id_1"
4015            );
4016        }
4017
4018        // Second client
4019        let client_id_2 = ClientId::from("DYDX-CLIENT-2");
4020        let config2 = DydxDataClientConfig::default();
4021        let http_client2 = DydxHttpClient::default();
4022        let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
4023
4024        let request2 = RequestInstruments::new(
4025            None,
4026            None,
4027            Some(client_id_2),
4028            Some(*DYDX_VENUE),
4029            UUID4::new(),
4030            get_atomic_clock_realtime().get_time_ns(),
4031            None,
4032        );
4033
4034        assert!(client2.request_instruments(&request2).is_ok());
4035
4036        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
4037            tokio::time::timeout(timeout, rx.recv()).await
4038        {
4039            assert_eq!(
4040                resp2.client_id, client_id_2,
4041                "Response should contain client_id_2"
4042            );
4043            assert_ne!(
4044                resp2.client_id, client_id_1,
4045                "Different clients should have different client_ids"
4046            );
4047        }
4048    }
4049
4050    #[tokio::test]
4051    async fn test_request_instruments_no_timestamps() {
4052        // Test fetching all current instruments (no start/end filters)
4053        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4054        set_data_event_sender(sender);
4055
4056        let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
4057        let config = DydxDataClientConfig::default();
4058        let http_client = DydxHttpClient::default();
4059        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4060
4061        let request = RequestInstruments::new(
4062            None, // No start filter
4063            None, // No end filter
4064            Some(client_id),
4065            Some(*DYDX_VENUE),
4066            UUID4::new(),
4067            get_atomic_clock_realtime().get_time_ns(),
4068            None,
4069        );
4070
4071        assert!(client.request_instruments(&request).is_ok());
4072
4073        let timeout = tokio::time::Duration::from_secs(5);
4074        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4075            tokio::time::timeout(timeout, rx.recv()).await
4076        {
4077            // Verify no timestamp filters
4078            assert!(
4079                resp.start.is_none(),
4080                "start should be None when not provided"
4081            );
4082            assert!(resp.end.is_none(), "end should be None when not provided");
4083
4084            // Should still get current instruments
4085            assert_eq!(resp.venue, *DYDX_VENUE);
4086            assert_eq!(resp.client_id, client_id);
4087            assert!(resp.ts_init > 0);
4088        }
4089    }
4090
4091    #[tokio::test]
4092    async fn test_request_instrument_cache_hit() {
4093        // Test cache hit (instrument already cached)
4094        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4095        set_data_event_sender(sender);
4096
4097        let client_id = ClientId::from("DYDX-CACHE-HIT");
4098        let config = DydxDataClientConfig::default();
4099        let http_client = DydxHttpClient::default();
4100        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4101
4102        // Pre-populate cache with test instrument
4103        let instrument = create_test_instrument_any();
4104        let instrument_id = instrument.id();
4105        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4106        client.instruments.insert(symbol_key, instrument.clone());
4107
4108        let request = RequestInstrument::new(
4109            instrument_id,
4110            None,
4111            None,
4112            Some(client_id),
4113            UUID4::new(),
4114            get_atomic_clock_realtime().get_time_ns(),
4115            None,
4116        );
4117
4118        assert!(client.request_instrument(&request).is_ok());
4119
4120        // Should get immediate response from cache
4121        let timeout = tokio::time::Duration::from_millis(500);
4122        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4123            tokio::time::timeout(timeout, rx.recv()).await
4124        {
4125            assert_eq!(resp.instrument_id, instrument_id);
4126            assert_eq!(resp.client_id, client_id);
4127            assert_eq!(resp.data.id(), instrument_id);
4128        }
4129    }
4130
4131    #[tokio::test]
4132    async fn test_request_instrument_cache_miss() {
4133        // Test cache miss (fetch from API)
4134        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4135        set_data_event_sender(sender);
4136
4137        let client_id = ClientId::from("DYDX-CACHE-MISS");
4138        let config = DydxDataClientConfig::default();
4139        let http_client = DydxHttpClient::default();
4140        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4141
4142        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4143
4144        let request = RequestInstrument::new(
4145            instrument_id,
4146            None,
4147            None,
4148            Some(client_id),
4149            UUID4::new(),
4150            get_atomic_clock_realtime().get_time_ns(),
4151            None,
4152        );
4153
4154        assert!(client.request_instrument(&request).is_ok());
4155
4156        // Wait for async HTTP fetch and response
4157        let timeout = tokio::time::Duration::from_secs(5);
4158        let result = tokio::time::timeout(timeout, rx.recv()).await;
4159
4160        // May timeout if testnet unreachable, but should not panic
4161        match result {
4162            Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4163                assert_eq!(resp.instrument_id, instrument_id);
4164                assert_eq!(resp.client_id, client_id);
4165            }
4166            Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4167            Ok(None) => panic!("Channel closed unexpectedly"),
4168            Err(_) => {
4169                println!("Test timed out - testnet may be unreachable");
4170            }
4171        }
4172    }
4173
4174    #[tokio::test]
4175    async fn test_request_instrument_not_found() {
4176        // Test instrument not found scenario
4177        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4178        set_data_event_sender(sender);
4179
4180        let client_id = ClientId::from("DYDX-NOT-FOUND");
4181        let config = DydxDataClientConfig {
4182            base_url_http: Some("http://invalid-url.local".to_string()),
4183            ..Default::default()
4184        };
4185        let http_client = DydxHttpClient::new(
4186            config.base_url_http.clone(),
4187            config.http_timeout_secs,
4188            config.http_proxy_url.clone(),
4189            config.is_testnet,
4190            None,
4191        )
4192        .unwrap();
4193
4194        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4195
4196        let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4197
4198        let request = RequestInstrument::new(
4199            instrument_id,
4200            None,
4201            None,
4202            Some(client_id),
4203            UUID4::new(),
4204            get_atomic_clock_realtime().get_time_ns(),
4205            None,
4206        );
4207
4208        // Should not panic on invalid instrument
4209        assert!(client.request_instrument(&request).is_ok());
4210
4211        // Note: No response sent when instrument not found (by design)
4212        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4213    }
4214
4215    #[tokio::test]
4216    async fn test_request_instrument_bulk_caching() {
4217        // Test bulk caching when fetching from API
4218        setup_test_env();
4219
4220        let client_id = ClientId::from("DYDX-BULK-CACHE");
4221        let config = DydxDataClientConfig::default();
4222        let http_client = DydxHttpClient::default();
4223        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4224
4225        let initial_cache_size = client.instruments.len();
4226
4227        let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4228
4229        let request = RequestInstrument::new(
4230            instrument_id,
4231            None,
4232            None,
4233            Some(client_id),
4234            UUID4::new(),
4235            get_atomic_clock_realtime().get_time_ns(),
4236            None,
4237        );
4238
4239        assert!(client.request_instrument(&request).is_ok());
4240
4241        // Wait for async bulk fetch
4242        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4243
4244        // Verify cache populated with all instruments (if HTTP succeeded)
4245        let final_cache_size = client.instruments.len();
4246        assert!(final_cache_size >= initial_cache_size);
4247        // If HTTP succeeded, cache should have multiple instruments
4248    }
4249
4250    #[tokio::test]
4251    async fn test_request_instrument_correlation_id() {
4252        // Test correlation_id matching
4253        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4254        set_data_event_sender(sender);
4255
4256        let client_id = ClientId::from("DYDX-CORR-ID");
4257        let config = DydxDataClientConfig::default();
4258        let http_client = DydxHttpClient::default();
4259        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4260
4261        // Pre-populate cache to get immediate response
4262        let instrument = create_test_instrument_any();
4263        let instrument_id = instrument.id();
4264        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4265        client.instruments.insert(symbol_key, instrument.clone());
4266
4267        let request_id = UUID4::new();
4268        let request = RequestInstrument::new(
4269            instrument_id,
4270            None,
4271            None,
4272            Some(client_id),
4273            request_id,
4274            get_atomic_clock_realtime().get_time_ns(),
4275            None,
4276        );
4277
4278        assert!(client.request_instrument(&request).is_ok());
4279
4280        // Verify correlation_id matches
4281        let timeout = tokio::time::Duration::from_millis(500);
4282        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4283            tokio::time::timeout(timeout, rx.recv()).await
4284        {
4285            assert_eq!(resp.correlation_id, request_id);
4286        }
4287    }
4288
4289    #[tokio::test]
4290    async fn test_request_instrument_response_format_boxed() {
4291        // Verify InstrumentResponse format (boxed)
4292        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4293        set_data_event_sender(sender);
4294
4295        let client_id = ClientId::from("DYDX-BOXED");
4296        let config = DydxDataClientConfig::default();
4297        let http_client = DydxHttpClient::default();
4298        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4299
4300        // Pre-populate cache
4301        let instrument = create_test_instrument_any();
4302        let instrument_id = instrument.id();
4303        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4304        client.instruments.insert(symbol_key, instrument.clone());
4305
4306        let request = RequestInstrument::new(
4307            instrument_id,
4308            None,
4309            None,
4310            Some(client_id),
4311            UUID4::new(),
4312            get_atomic_clock_realtime().get_time_ns(),
4313            None,
4314        );
4315
4316        assert!(client.request_instrument(&request).is_ok());
4317
4318        // Verify response is properly boxed
4319        let timeout = tokio::time::Duration::from_millis(500);
4320        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4321            tokio::time::timeout(timeout, rx.recv()).await
4322        {
4323            // Verify boxed response structure
4324            assert_eq!(boxed_resp.instrument_id, instrument_id);
4325            assert_eq!(boxed_resp.client_id, client_id);
4326            assert!(boxed_resp.start.is_none());
4327            assert!(boxed_resp.end.is_none());
4328            assert!(boxed_resp.ts_init > 0);
4329        }
4330    }
4331
4332    #[rstest]
4333    fn test_request_instrument_symbol_extraction() {
4334        // Test symbol extraction from InstrumentId
4335        setup_test_env();
4336
4337        let client_id = ClientId::from("DYDX-SYMBOL");
4338        let config = DydxDataClientConfig::default();
4339        let http_client = DydxHttpClient::default();
4340        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4341
4342        // Test various instrument ID formats
4343        // Note: Symbol includes the -PERP suffix in dYdX
4344        let test_cases = vec![
4345            ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4346            ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4347            ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4348        ];
4349
4350        for (instrument_id_str, expected_symbol) in test_cases {
4351            let instrument_id = InstrumentId::from(instrument_id_str);
4352            let symbol = Ustr::from(instrument_id.symbol.as_str());
4353            assert_eq!(symbol.as_str(), expected_symbol);
4354        }
4355    }
4356
4357    #[tokio::test]
4358    async fn test_request_instrument_client_id_fallback() {
4359        // Test client_id fallback to default
4360        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4361        set_data_event_sender(sender);
4362
4363        let client_id = ClientId::from("DYDX-FALLBACK");
4364        let config = DydxDataClientConfig::default();
4365        let http_client = DydxHttpClient::default();
4366        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4367
4368        // Pre-populate cache
4369        let instrument = create_test_instrument_any();
4370        let instrument_id = instrument.id();
4371        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4372        client.instruments.insert(symbol_key, instrument.clone());
4373
4374        let request = RequestInstrument::new(
4375            instrument_id,
4376            None,
4377            None,
4378            None, // No client_id provided
4379            UUID4::new(),
4380            get_atomic_clock_realtime().get_time_ns(),
4381            None,
4382        );
4383
4384        assert!(client.request_instrument(&request).is_ok());
4385
4386        // Should use client's default client_id
4387        let timeout = tokio::time::Duration::from_millis(500);
4388        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4389            tokio::time::timeout(timeout, rx.recv()).await
4390        {
4391            assert_eq!(resp.client_id, client_id);
4392        }
4393    }
4394
4395    #[tokio::test]
4396    async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4397        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4398        set_data_event_sender(sender);
4399
4400        let created_at = Utc::now();
4401
4402        let http_trade = crate::http::models::Trade {
4403            id: "trade-1".to_string(),
4404            side: OrderSide::Buy,
4405            size: dec!(1.5),
4406            price: dec!(100.25),
4407            created_at,
4408            created_at_height: 1,
4409            trade_type: crate::common::enums::DydxTradeType::Limit,
4410        };
4411
4412        let trades_response = crate::http::models::TradesResponse {
4413            trades: vec![http_trade],
4414        };
4415
4416        let state = TradesTestState {
4417            response: Arc::new(trades_response),
4418            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4419            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4420        };
4421
4422        let addr = start_trades_test_server(state.clone()).await;
4423        let base_url = format!("http://{addr}");
4424
4425        let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4426        let config = DydxDataClientConfig {
4427            base_url_http: Some(base_url),
4428            is_testnet: true,
4429            ..Default::default()
4430        };
4431
4432        let http_client = DydxHttpClient::new(
4433            config.base_url_http.clone(),
4434            config.http_timeout_secs,
4435            config.http_proxy_url.clone(),
4436            config.is_testnet,
4437            None,
4438        )
4439        .unwrap();
4440
4441        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4442
4443        let instrument = create_test_instrument_any();
4444        let instrument_id = instrument.id();
4445        let price_precision = instrument.price_precision();
4446        let size_precision = instrument.size_precision();
4447        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4448        client.instruments.insert(symbol_key, instrument);
4449
4450        let request_id = UUID4::new();
4451        let now = Utc::now();
4452        let start = Some(now - chrono::Duration::seconds(10));
4453        let end = Some(now + chrono::Duration::seconds(10));
4454        let limit = std::num::NonZeroUsize::new(100).unwrap();
4455
4456        let request = RequestTrades::new(
4457            instrument_id,
4458            start,
4459            end,
4460            Some(limit),
4461            Some(client_id),
4462            request_id,
4463            get_atomic_clock_realtime().get_time_ns(),
4464            None,
4465        );
4466
4467        assert!(client.request_trades(&request).is_ok());
4468
4469        let timeout = tokio::time::Duration::from_secs(1);
4470        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4471            tokio::time::timeout(timeout, rx.recv()).await
4472        {
4473            assert_eq!(resp.correlation_id, request_id);
4474            assert_eq!(resp.client_id, client_id);
4475            assert_eq!(resp.instrument_id, instrument_id);
4476            assert_eq!(resp.data.len(), 1);
4477
4478            let tick = &resp.data[0];
4479            assert_eq!(tick.instrument_id, instrument_id);
4480            assert_eq!(tick.price, Price::new(100.25, price_precision));
4481            assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4482            assert_eq!(tick.trade_id.to_string(), "trade-1");
4483
4484            use nautilus_model::enums::AggressorSide;
4485            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4486        } else {
4487            panic!("did not receive trades response in time");
4488        }
4489
4490        // Verify symbol conversion (strip -PERP suffix) and limit propagation.
4491        let last_ticker = state.last_ticker.lock().await.clone();
4492        assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4493
4494        let last_limit = *state.last_limit.lock().await;
4495        assert_eq!(last_limit, Some(Some(100)));
4496    }
4497
4498    #[tokio::test]
4499    async fn test_request_trades_empty_response_and_no_limit() {
4500        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4501        set_data_event_sender(sender);
4502
4503        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4504
4505        let state = TradesTestState {
4506            response: Arc::new(trades_response),
4507            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4508            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4509        };
4510
4511        let addr = start_trades_test_server(state.clone()).await;
4512        let base_url = format!("http://{addr}");
4513
4514        let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4515        let config = DydxDataClientConfig {
4516            base_url_http: Some(base_url),
4517            is_testnet: true,
4518            ..Default::default()
4519        };
4520
4521        let http_client = DydxHttpClient::new(
4522            config.base_url_http.clone(),
4523            config.http_timeout_secs,
4524            config.http_proxy_url.clone(),
4525            config.is_testnet,
4526            None,
4527        )
4528        .unwrap();
4529
4530        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4531
4532        let instrument = create_test_instrument_any();
4533        let instrument_id = instrument.id();
4534        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4535        client.instruments.insert(symbol_key, instrument);
4536
4537        let request_id = UUID4::new();
4538
4539        let request = RequestTrades::new(
4540            instrument_id,
4541            None,
4542            None,
4543            None, // No limit
4544            Some(client_id),
4545            request_id,
4546            get_atomic_clock_realtime().get_time_ns(),
4547            None,
4548        );
4549
4550        assert!(client.request_trades(&request).is_ok());
4551
4552        let timeout = tokio::time::Duration::from_secs(1);
4553        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4554            tokio::time::timeout(timeout, rx.recv()).await
4555        {
4556            assert_eq!(resp.correlation_id, request_id);
4557            assert_eq!(resp.client_id, client_id);
4558            assert_eq!(resp.instrument_id, instrument_id);
4559            assert!(resp.data.is_empty());
4560        } else {
4561            panic!("did not receive trades response in time");
4562        }
4563
4564        // Verify that no `limit` query parameter was sent.
4565        let last_limit = *state.last_limit.lock().await;
4566        assert_eq!(last_limit, Some(None));
4567    }
4568
4569    #[tokio::test]
4570    async fn test_request_trades_timestamp_filtering() {
4571        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4572        set_data_event_sender(sender);
4573
4574        let now = Utc::now();
4575        let trade_before = crate::http::models::Trade {
4576            id: "before".to_string(),
4577            side: OrderSide::Buy,
4578            size: dec!(1.0),
4579            price: dec!(100.0),
4580            created_at: now - chrono::Duration::seconds(60),
4581            created_at_height: 1,
4582            trade_type: crate::common::enums::DydxTradeType::Limit,
4583        };
4584        let trade_inside = crate::http::models::Trade {
4585            id: "inside".to_string(),
4586            side: OrderSide::Sell,
4587            size: dec!(2.0),
4588            price: dec!(101.0),
4589            created_at: now,
4590            created_at_height: 2,
4591            trade_type: crate::common::enums::DydxTradeType::Limit,
4592        };
4593        let trade_after = crate::http::models::Trade {
4594            id: "after".to_string(),
4595            side: OrderSide::Buy,
4596            size: dec!(3.0),
4597            price: dec!(102.0),
4598            created_at: now + chrono::Duration::seconds(60),
4599            created_at_height: 3,
4600            trade_type: crate::common::enums::DydxTradeType::Limit,
4601        };
4602
4603        let trades_response = crate::http::models::TradesResponse {
4604            trades: vec![trade_before, trade_inside.clone(), trade_after],
4605        };
4606
4607        let state = TradesTestState {
4608            response: Arc::new(trades_response),
4609            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4610            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4611        };
4612
4613        let addr = start_trades_test_server(state).await;
4614        let base_url = format!("http://{addr}");
4615
4616        let client_id = ClientId::from("DYDX-TRADES-FILTER");
4617        let config = DydxDataClientConfig {
4618            base_url_http: Some(base_url),
4619            is_testnet: true,
4620            ..Default::default()
4621        };
4622
4623        let http_client = DydxHttpClient::new(
4624            config.base_url_http.clone(),
4625            config.http_timeout_secs,
4626            config.http_proxy_url.clone(),
4627            config.is_testnet,
4628            None,
4629        )
4630        .unwrap();
4631
4632        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4633
4634        let instrument = create_test_instrument_any();
4635        let instrument_id = instrument.id();
4636        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4637        client.instruments.insert(symbol_key, instrument);
4638
4639        let request_id = UUID4::new();
4640
4641        // Filter range includes only the "inside" trade.
4642        let start = Some(now - chrono::Duration::seconds(10));
4643        let end = Some(now + chrono::Duration::seconds(10));
4644
4645        let request = RequestTrades::new(
4646            instrument_id,
4647            start,
4648            end,
4649            None,
4650            Some(client_id),
4651            request_id,
4652            get_atomic_clock_realtime().get_time_ns(),
4653            None,
4654        );
4655
4656        assert!(client.request_trades(&request).is_ok());
4657
4658        let timeout = tokio::time::Duration::from_secs(1);
4659        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4660            tokio::time::timeout(timeout, rx.recv()).await
4661        {
4662            assert_eq!(resp.correlation_id, request_id);
4663            assert_eq!(resp.client_id, client_id);
4664            assert_eq!(resp.instrument_id, instrument_id);
4665            assert_eq!(resp.data.len(), 1);
4666
4667            let tick = &resp.data[0];
4668            assert_eq!(tick.trade_id.to_string(), "inside");
4669            assert_eq!(tick.price.as_decimal(), dec!(101.0));
4670        } else {
4671            panic!("did not receive trades response in time");
4672        }
4673    }
4674
4675    #[tokio::test]
4676    async fn test_request_trades_correlation_id_matching() {
4677        // Test correlation_id matching in response
4678        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4679        set_data_event_sender(sender);
4680
4681        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4682
4683        let state = TradesTestState {
4684            response: Arc::new(trades_response),
4685            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4686            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4687        };
4688
4689        let addr = start_trades_test_server(state).await;
4690        let base_url = format!("http://{addr}");
4691
4692        let client_id = ClientId::from("DYDX-TRADES-CORR");
4693        let config = DydxDataClientConfig {
4694            base_url_http: Some(base_url),
4695            is_testnet: true,
4696            ..Default::default()
4697        };
4698
4699        let http_client = DydxHttpClient::new(
4700            config.base_url_http.clone(),
4701            config.http_timeout_secs,
4702            config.http_proxy_url.clone(),
4703            config.is_testnet,
4704            None,
4705        )
4706        .unwrap();
4707
4708        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4709
4710        let instrument = create_test_instrument_any();
4711        let instrument_id = instrument.id();
4712        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4713        client.instruments.insert(symbol_key, instrument);
4714
4715        let request_id = UUID4::new();
4716        let request = RequestTrades::new(
4717            instrument_id,
4718            None,
4719            None,
4720            None,
4721            Some(client_id),
4722            request_id,
4723            get_atomic_clock_realtime().get_time_ns(),
4724            None,
4725        );
4726
4727        assert!(client.request_trades(&request).is_ok());
4728
4729        let timeout = tokio::time::Duration::from_millis(500);
4730        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4731            tokio::time::timeout(timeout, rx.recv()).await
4732        {
4733            assert_eq!(resp.correlation_id, request_id);
4734        }
4735    }
4736
4737    #[tokio::test]
4738    async fn test_request_trades_response_format() {
4739        // Verify TradesResponse format
4740        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4741        set_data_event_sender(sender);
4742
4743        let created_at = Utc::now();
4744        let http_trade = crate::http::models::Trade {
4745            id: "format-test".to_string(),
4746            side: OrderSide::Sell,
4747            size: dec!(5.0),
4748            price: dec!(200.0),
4749            created_at,
4750            created_at_height: 100,
4751            trade_type: crate::common::enums::DydxTradeType::Limit,
4752        };
4753
4754        let trades_response = crate::http::models::TradesResponse {
4755            trades: vec![http_trade],
4756        };
4757
4758        let state = TradesTestState {
4759            response: Arc::new(trades_response),
4760            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4761            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4762        };
4763
4764        let addr = start_trades_test_server(state).await;
4765        let base_url = format!("http://{addr}");
4766
4767        let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4768        let config = DydxDataClientConfig {
4769            base_url_http: Some(base_url),
4770            is_testnet: true,
4771            ..Default::default()
4772        };
4773
4774        let http_client = DydxHttpClient::new(
4775            config.base_url_http.clone(),
4776            config.http_timeout_secs,
4777            config.http_proxy_url.clone(),
4778            config.is_testnet,
4779            None,
4780        )
4781        .unwrap();
4782
4783        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4784
4785        let instrument = create_test_instrument_any();
4786        let instrument_id = instrument.id();
4787        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4788        client.instruments.insert(symbol_key, instrument);
4789
4790        let request = RequestTrades::new(
4791            instrument_id,
4792            None,
4793            None,
4794            None,
4795            Some(client_id),
4796            UUID4::new(),
4797            get_atomic_clock_realtime().get_time_ns(),
4798            None,
4799        );
4800
4801        assert!(client.request_trades(&request).is_ok());
4802
4803        let timeout = tokio::time::Duration::from_millis(500);
4804        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4805            tokio::time::timeout(timeout, rx.recv()).await
4806        {
4807            // Verify response structure
4808            assert_eq!(resp.client_id, client_id);
4809            assert_eq!(resp.instrument_id, instrument_id);
4810            assert!(resp.data.len() == 1);
4811            assert!(resp.ts_init > 0);
4812
4813            // Verify trade tick structure
4814            let tick = &resp.data[0];
4815            assert_eq!(tick.instrument_id, instrument_id);
4816            assert!(tick.ts_event > 0);
4817            assert!(tick.ts_init > 0);
4818        }
4819    }
4820
4821    #[tokio::test]
4822    async fn test_request_trades_no_instrument_in_cache() {
4823        // Test empty response when instrument not in cache
4824        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4825        set_data_event_sender(sender);
4826
4827        let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4828        let config = DydxDataClientConfig::default();
4829        let http_client = DydxHttpClient::default();
4830        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4831
4832        // Don't add instrument to cache
4833        let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4834
4835        let request = RequestTrades::new(
4836            instrument_id,
4837            None,
4838            None,
4839            None,
4840            Some(client_id),
4841            UUID4::new(),
4842            get_atomic_clock_realtime().get_time_ns(),
4843            None,
4844        );
4845
4846        assert!(client.request_trades(&request).is_ok());
4847
4848        // Should receive empty response when instrument not found
4849        let timeout = tokio::time::Duration::from_millis(500);
4850        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4851            tokio::time::timeout(timeout, rx.recv()).await
4852        {
4853            assert!(resp.data.is_empty());
4854        }
4855    }
4856
4857    #[tokio::test]
4858    async fn test_request_trades_limit_parameter() {
4859        // Test limit parameter handling
4860        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4861        set_data_event_sender(sender);
4862
4863        let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4864
4865        let state = TradesTestState {
4866            response: Arc::new(trades_response),
4867            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4868            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4869        };
4870
4871        let addr = start_trades_test_server(state.clone()).await;
4872        let base_url = format!("http://{addr}");
4873
4874        let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4875        let config = DydxDataClientConfig {
4876            base_url_http: Some(base_url),
4877            is_testnet: true,
4878            ..Default::default()
4879        };
4880
4881        let http_client = DydxHttpClient::new(
4882            config.base_url_http.clone(),
4883            config.http_timeout_secs,
4884            config.http_proxy_url.clone(),
4885            config.is_testnet,
4886            None,
4887        )
4888        .unwrap();
4889
4890        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4891
4892        let instrument = create_test_instrument_any();
4893        let instrument_id = instrument.id();
4894        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4895        client.instruments.insert(symbol_key, instrument);
4896
4897        // Test with limit
4898        let limit = std::num::NonZeroUsize::new(500).unwrap();
4899        let request = RequestTrades::new(
4900            instrument_id,
4901            None,
4902            None,
4903            Some(limit),
4904            Some(client_id),
4905            UUID4::new(),
4906            get_atomic_clock_realtime().get_time_ns(),
4907            None,
4908        );
4909
4910        assert!(client.request_trades(&request).is_ok());
4911
4912        let state_clone = state.clone();
4913        wait_until_async(
4914            || async { state_clone.last_limit.lock().await.is_some() },
4915            Duration::from_secs(5),
4916        )
4917        .await;
4918
4919        // Verify limit was passed to HTTP client
4920        let last_limit = *state.last_limit.lock().await;
4921        assert_eq!(last_limit, Some(Some(500)));
4922    }
4923
4924    #[rstest]
4925    fn test_request_trades_symbol_conversion() {
4926        // Test symbol conversion (strip -PERP suffix)
4927        setup_test_env();
4928
4929        let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4930        let config = DydxDataClientConfig::default();
4931        let http_client = DydxHttpClient::default();
4932        let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4933
4934        // Verify symbol format for various instruments
4935        let test_cases = vec![
4936            ("BTC-USD-PERP.DYDX", "BTC-USD"),
4937            ("ETH-USD-PERP.DYDX", "ETH-USD"),
4938            ("SOL-USD-PERP.DYDX", "SOL-USD"),
4939        ];
4940
4941        for (instrument_id_str, expected_ticker) in test_cases {
4942            let instrument_id = InstrumentId::from(instrument_id_str);
4943            let ticker = instrument_id
4944                .symbol
4945                .as_str()
4946                .trim_end_matches("-PERP")
4947                .to_string();
4948            assert_eq!(ticker, expected_ticker);
4949        }
4950    }
4951
4952    #[tokio::test]
4953    async fn test_http_404_handling() {
4954        // Test HTTP 404 handling (instrument not found)
4955        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4956        set_data_event_sender(sender);
4957
4958        let client_id = ClientId::from("DYDX-404");
4959        let config = DydxDataClientConfig {
4960            base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4961            http_timeout_secs: Some(1),
4962            ..Default::default()
4963        };
4964
4965        let http_client = DydxHttpClient::new(
4966            config.base_url_http.clone(),
4967            config.http_timeout_secs,
4968            config.http_proxy_url.clone(),
4969            config.is_testnet,
4970            None,
4971        )
4972        .unwrap();
4973
4974        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4975
4976        let request = RequestInstruments::new(
4977            None,
4978            None,
4979            Some(client_id),
4980            Some(*DYDX_VENUE),
4981            UUID4::new(),
4982            get_atomic_clock_realtime().get_time_ns(),
4983            None,
4984        );
4985
4986        assert!(client.request_instruments(&request).is_ok());
4987
4988        // Should receive empty response on 404
4989        let timeout = tokio::time::Duration::from_secs(2);
4990        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4991            tokio::time::timeout(timeout, rx.recv()).await
4992        {
4993            assert!(resp.data.is_empty(), "Expected empty response on 404");
4994        }
4995    }
4996
4997    #[tokio::test]
4998    async fn test_http_500_handling() {
4999        // Test HTTP 500 handling (internal server error)
5000        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5001        set_data_event_sender(sender);
5002
5003        let client_id = ClientId::from("DYDX-500");
5004        let config = DydxDataClientConfig {
5005            base_url_http: Some("http://httpstat.us/500".to_string()),
5006            http_timeout_secs: Some(2),
5007            ..Default::default()
5008        };
5009
5010        let http_client = DydxHttpClient::new(
5011            config.base_url_http.clone(),
5012            config.http_timeout_secs,
5013            config.http_proxy_url.clone(),
5014            config.is_testnet,
5015            None,
5016        )
5017        .unwrap();
5018
5019        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5020
5021        let instrument = create_test_instrument_any();
5022        let instrument_id = instrument.id();
5023        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5024        client.instruments.insert(symbol_key, instrument);
5025
5026        let request = RequestTrades::new(
5027            instrument_id,
5028            None,
5029            None,
5030            None,
5031            Some(client_id),
5032            UUID4::new(),
5033            get_atomic_clock_realtime().get_time_ns(),
5034            None,
5035        );
5036
5037        assert!(client.request_trades(&request).is_ok());
5038
5039        // Should receive empty response on 500 error
5040        let timeout = tokio::time::Duration::from_secs(3);
5041        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5042            tokio::time::timeout(timeout, rx.recv()).await
5043        {
5044            assert!(resp.data.is_empty(), "Expected empty response on 500");
5045        }
5046    }
5047
5048    #[tokio::test]
5049    async fn test_network_timeout_handling() {
5050        // Test network timeout scenarios
5051        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5052        set_data_event_sender(sender);
5053
5054        let client_id = ClientId::from("DYDX-TIMEOUT");
5055        let config = DydxDataClientConfig {
5056            base_url_http: Some("http://10.255.255.1:81".to_string()), // Non-routable IP
5057            http_timeout_secs: Some(1),                                // Very short timeout
5058            ..Default::default()
5059        };
5060
5061        let http_client = DydxHttpClient::new(
5062            config.base_url_http.clone(),
5063            config.http_timeout_secs,
5064            config.http_proxy_url.clone(),
5065            config.is_testnet,
5066            None,
5067        )
5068        .unwrap();
5069
5070        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5071
5072        let request = RequestInstruments::new(
5073            None,
5074            None,
5075            Some(client_id),
5076            Some(*DYDX_VENUE),
5077            UUID4::new(),
5078            get_atomic_clock_realtime().get_time_ns(),
5079            None,
5080        );
5081
5082        assert!(client.request_instruments(&request).is_ok());
5083
5084        // Should timeout and return empty response
5085        let timeout = tokio::time::Duration::from_secs(3);
5086        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5087            tokio::time::timeout(timeout, rx.recv()).await
5088        {
5089            assert!(resp.data.is_empty(), "Expected empty response on timeout");
5090        }
5091    }
5092
5093    #[tokio::test]
5094    async fn test_connection_refused_handling() {
5095        // Test connection refused errors
5096        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5097        set_data_event_sender(sender);
5098
5099        let client_id = ClientId::from("DYDX-REFUSED");
5100        let config = DydxDataClientConfig {
5101            base_url_http: Some("http://localhost:9999".to_string()), // Port unlikely to be open
5102            http_timeout_secs: Some(1),
5103            ..Default::default()
5104        };
5105
5106        let http_client = DydxHttpClient::new(
5107            config.base_url_http.clone(),
5108            config.http_timeout_secs,
5109            config.http_proxy_url.clone(),
5110            config.is_testnet,
5111            None,
5112        )
5113        .unwrap();
5114
5115        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5116
5117        let instrument = create_test_instrument_any();
5118        let instrument_id = instrument.id();
5119        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5120        client.instruments.insert(symbol_key, instrument);
5121
5122        let request = RequestInstrument::new(
5123            instrument_id,
5124            None,
5125            None,
5126            Some(client_id),
5127            UUID4::new(),
5128            get_atomic_clock_realtime().get_time_ns(),
5129            None,
5130        );
5131
5132        assert!(client.request_instrument(&request).is_ok());
5133
5134        // Should handle connection refused gracefully
5135        let timeout = tokio::time::Duration::from_secs(2);
5136        let result = tokio::time::timeout(timeout, rx.recv()).await;
5137
5138        // May not receive response if connection fails before spawning handler
5139        // This is acceptable - the important part is no panic
5140        match result {
5141            Ok(Some(DataEvent::Response(_))) => {
5142                // Response received (empty data expected)
5143            }
5144            Ok(None) | Err(_) => {
5145                // No response or timeout - acceptable for connection refused
5146            }
5147            _ => {}
5148        }
5149    }
5150
5151    #[tokio::test]
5152    async fn test_dns_resolution_failure_handling() {
5153        // Test DNS resolution failures
5154        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5155        set_data_event_sender(sender);
5156
5157        let client_id = ClientId::from("DYDX-DNS");
5158        let config = DydxDataClientConfig {
5159            base_url_http: Some(
5160                "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5161            ),
5162            http_timeout_secs: Some(2),
5163            ..Default::default()
5164        };
5165
5166        let http_client = DydxHttpClient::new(
5167            config.base_url_http.clone(),
5168            config.http_timeout_secs,
5169            config.http_proxy_url.clone(),
5170            config.is_testnet,
5171            None,
5172        )
5173        .unwrap();
5174
5175        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5176
5177        let request = RequestInstruments::new(
5178            None,
5179            None,
5180            Some(client_id),
5181            Some(*DYDX_VENUE),
5182            UUID4::new(),
5183            get_atomic_clock_realtime().get_time_ns(),
5184            None,
5185        );
5186
5187        assert!(client.request_instruments(&request).is_ok());
5188
5189        // Should handle DNS failure gracefully with empty response
5190        let timeout = tokio::time::Duration::from_secs(3);
5191        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5192            tokio::time::timeout(timeout, rx.recv()).await
5193        {
5194            assert!(
5195                resp.data.is_empty(),
5196                "Expected empty response on DNS failure"
5197            );
5198        }
5199    }
5200
5201    #[tokio::test]
5202    async fn test_http_502_503_handling() {
5203        // Test HTTP 502/503 handling (bad gateway/service unavailable)
5204        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5205        set_data_event_sender(sender);
5206
5207        let client_id = ClientId::from("DYDX-503");
5208        let config = DydxDataClientConfig {
5209            base_url_http: Some("http://httpstat.us/503".to_string()),
5210            http_timeout_secs: Some(2),
5211            ..Default::default()
5212        };
5213
5214        let http_client = DydxHttpClient::new(
5215            config.base_url_http.clone(),
5216            config.http_timeout_secs,
5217            config.http_proxy_url.clone(),
5218            config.is_testnet,
5219            None,
5220        )
5221        .unwrap();
5222
5223        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5224
5225        let request = RequestInstruments::new(
5226            None,
5227            None,
5228            Some(client_id),
5229            Some(*DYDX_VENUE),
5230            UUID4::new(),
5231            get_atomic_clock_realtime().get_time_ns(),
5232            None,
5233        );
5234
5235        assert!(client.request_instruments(&request).is_ok());
5236
5237        // Should handle 503 gracefully with empty response
5238        let timeout = tokio::time::Duration::from_secs(3);
5239        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5240            tokio::time::timeout(timeout, rx.recv()).await
5241        {
5242            assert!(resp.data.is_empty(), "Expected empty response on 503");
5243        }
5244    }
5245
5246    #[tokio::test]
5247    async fn test_http_429_rate_limit_handling() {
5248        // Test HTTP 429 handling (rate limit exceeded)
5249        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5250        set_data_event_sender(sender);
5251
5252        let client_id = ClientId::from("DYDX-429");
5253        let config = DydxDataClientConfig {
5254            base_url_http: Some("http://httpstat.us/429".to_string()),
5255            http_timeout_secs: Some(2),
5256            ..Default::default()
5257        };
5258
5259        let http_client = DydxHttpClient::new(
5260            config.base_url_http.clone(),
5261            config.http_timeout_secs,
5262            config.http_proxy_url.clone(),
5263            config.is_testnet,
5264            None,
5265        )
5266        .unwrap();
5267
5268        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5269
5270        let instrument = create_test_instrument_any();
5271        let instrument_id = instrument.id();
5272        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5273        client.instruments.insert(symbol_key, instrument);
5274
5275        let request = RequestTrades::new(
5276            instrument_id,
5277            None,
5278            None,
5279            None,
5280            Some(client_id),
5281            UUID4::new(),
5282            get_atomic_clock_realtime().get_time_ns(),
5283            None,
5284        );
5285
5286        assert!(client.request_trades(&request).is_ok());
5287
5288        // Should handle rate limit with empty response
5289        let timeout = tokio::time::Duration::from_secs(3);
5290        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5291            tokio::time::timeout(timeout, rx.recv()).await
5292        {
5293            assert!(
5294                resp.data.is_empty(),
5295                "Expected empty response on rate limit"
5296            );
5297        }
5298    }
5299
5300    #[tokio::test]
5301    async fn test_error_handling_does_not_panic() {
5302        // Test that error scenarios don't cause panics
5303        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5304        set_data_event_sender(sender);
5305
5306        let client_id = ClientId::from("DYDX-NO-PANIC");
5307        let config = DydxDataClientConfig {
5308            base_url_http: Some("http://invalid".to_string()),
5309            ..Default::default()
5310        };
5311
5312        let http_client = DydxHttpClient::new(
5313            config.base_url_http.clone(),
5314            config.http_timeout_secs,
5315            config.http_proxy_url.clone(),
5316            config.is_testnet,
5317            None,
5318        )
5319        .unwrap();
5320
5321        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5322
5323        // All these should return Ok() without panicking
5324        let request_instruments = RequestInstruments::new(
5325            None,
5326            None,
5327            Some(client_id),
5328            Some(*DYDX_VENUE),
5329            UUID4::new(),
5330            get_atomic_clock_realtime().get_time_ns(),
5331            None,
5332        );
5333        assert!(client.request_instruments(&request_instruments).is_ok());
5334
5335        let instrument_id = InstrumentId::from("INVALID.DYDX");
5336        let request_instrument = RequestInstrument::new(
5337            instrument_id,
5338            None,
5339            None,
5340            Some(client_id),
5341            UUID4::new(),
5342            get_atomic_clock_realtime().get_time_ns(),
5343            None,
5344        );
5345        assert!(client.request_instrument(&request_instrument).is_ok());
5346
5347        let request_trades = RequestTrades::new(
5348            instrument_id,
5349            None,
5350            None,
5351            None,
5352            Some(client_id),
5353            UUID4::new(),
5354            get_atomic_clock_realtime().get_time_ns(),
5355            None,
5356        );
5357        assert!(client.request_trades(&request_trades).is_ok());
5358    }
5359
5360    #[tokio::test]
5361    async fn test_malformed_json_response() {
5362        // Test handling of malformed JSON from API
5363        use axum::{Router, routing::get};
5364
5365        #[derive(Clone)]
5366        struct MalformedState;
5367
5368        async fn malformed_markets_handler() -> String {
5369            // Invalid JSON - missing closing brace
5370            r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5371        }
5372
5373        let app = Router::new()
5374            .route("/v4/markets", get(malformed_markets_handler))
5375            .with_state(MalformedState);
5376
5377        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5378        let server_addr = listener.local_addr().unwrap();
5379        let port = server_addr.port();
5380
5381        tokio::spawn(async move {
5382            axum::serve(listener, app).await.unwrap();
5383        });
5384
5385        wait_for_server(server_addr).await;
5386
5387        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5388        set_data_event_sender(sender);
5389
5390        let client_id = ClientId::from("DYDX-MALFORMED");
5391        let config = DydxDataClientConfig {
5392            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5393            http_timeout_secs: Some(2),
5394            ..Default::default()
5395        };
5396
5397        let http_client = DydxHttpClient::new(
5398            config.base_url_http.clone(),
5399            config.http_timeout_secs,
5400            config.http_proxy_url.clone(),
5401            config.is_testnet,
5402            None,
5403        )
5404        .unwrap();
5405
5406        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5407
5408        let request = RequestInstruments::new(
5409            None,
5410            None,
5411            Some(client_id),
5412            Some(*DYDX_VENUE),
5413            UUID4::new(),
5414            get_atomic_clock_realtime().get_time_ns(),
5415            None,
5416        );
5417
5418        assert!(client.request_instruments(&request).is_ok());
5419
5420        // Should handle malformed JSON gracefully with empty response
5421        let timeout = tokio::time::Duration::from_secs(3);
5422        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5423            tokio::time::timeout(timeout, rx.recv()).await
5424        {
5425            assert!(
5426                resp.data.is_empty(),
5427                "Expected empty response on malformed JSON"
5428            );
5429        }
5430    }
5431
5432    #[tokio::test]
5433    async fn test_missing_required_fields_in_response() {
5434        // Test handling when API response missing required fields
5435        use axum::{Json, Router, routing::get};
5436        use serde_json::{Value, json};
5437
5438        #[derive(Clone)]
5439        struct MissingFieldsState;
5440
5441        async fn missing_fields_handler() -> Json<Value> {
5442            // Missing critical fields like "ticker", "stepSize", etc.
5443            Json(json!({
5444                "markets": {
5445                    "BTC-USD": {
5446                        // Missing "ticker"
5447                        "status": "ACTIVE",
5448                        "baseAsset": "BTC",
5449                        "quoteAsset": "USD",
5450                        // Missing "stepSize", "tickSize", "minOrderSize"
5451                    }
5452                }
5453            }))
5454        }
5455
5456        let app = Router::new()
5457            .route("/v4/markets", get(missing_fields_handler))
5458            .with_state(MissingFieldsState);
5459
5460        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5461        let server_addr = listener.local_addr().unwrap();
5462        let port = server_addr.port();
5463
5464        tokio::spawn(async move {
5465            axum::serve(listener, app).await.unwrap();
5466        });
5467
5468        wait_for_server(server_addr).await;
5469
5470        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5471        set_data_event_sender(sender);
5472
5473        let client_id = ClientId::from("DYDX-MISSING");
5474        let config = DydxDataClientConfig {
5475            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5476            http_timeout_secs: Some(2),
5477            ..Default::default()
5478        };
5479
5480        let http_client = DydxHttpClient::new(
5481            config.base_url_http.clone(),
5482            config.http_timeout_secs,
5483            config.http_proxy_url.clone(),
5484            config.is_testnet,
5485            None,
5486        )
5487        .unwrap();
5488
5489        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5490
5491        let request = RequestInstruments::new(
5492            None,
5493            None,
5494            Some(client_id),
5495            Some(*DYDX_VENUE),
5496            UUID4::new(),
5497            get_atomic_clock_realtime().get_time_ns(),
5498            None,
5499        );
5500
5501        assert!(client.request_instruments(&request).is_ok());
5502
5503        // Should handle missing fields gracefully (may skip instruments or return empty)
5504        let timeout = tokio::time::Duration::from_secs(3);
5505        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5506            tokio::time::timeout(timeout, rx.recv()).await
5507        {
5508            // Parse errors should result in empty or partial response
5509            // The important part is no panic
5510            assert!(resp.correlation_id == request.request_id);
5511        }
5512    }
5513
5514    #[tokio::test]
5515    async fn test_invalid_data_types_in_response() {
5516        // Test handling when API returns wrong data types
5517        use axum::{Json, Router, routing::get};
5518        use serde_json::{Value, json};
5519
5520        #[derive(Clone)]
5521        struct InvalidTypesState;
5522
5523        async fn invalid_types_handler() -> Json<Value> {
5524            // Wrong data types - strings instead of numbers, etc.
5525            Json(json!({
5526                "markets": {
5527                    "BTC-USD": {
5528                        "ticker": "BTC-USD",
5529                        "status": "ACTIVE",
5530                        "baseAsset": "BTC",
5531                        "quoteAsset": "USD",
5532                        "stepSize": "not_a_number",  // Should be numeric
5533                        "tickSize": true,  // Should be numeric
5534                        "minOrderSize": ["array"],  // Should be numeric
5535                        "market": 12345,  // Should be string
5536                    }
5537                }
5538            }))
5539        }
5540
5541        let app = Router::new()
5542            .route("/v4/markets", get(invalid_types_handler))
5543            .with_state(InvalidTypesState);
5544
5545        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5546        let server_addr = listener.local_addr().unwrap();
5547        let port = server_addr.port();
5548
5549        tokio::spawn(async move {
5550            axum::serve(listener, app).await.unwrap();
5551        });
5552
5553        wait_for_server(server_addr).await;
5554
5555        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5556        set_data_event_sender(sender);
5557
5558        let client_id = ClientId::from("DYDX-TYPES");
5559        let config = DydxDataClientConfig {
5560            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5561            http_timeout_secs: Some(2),
5562            ..Default::default()
5563        };
5564
5565        let http_client = DydxHttpClient::new(
5566            config.base_url_http.clone(),
5567            config.http_timeout_secs,
5568            config.http_proxy_url.clone(),
5569            config.is_testnet,
5570            None,
5571        )
5572        .unwrap();
5573
5574        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5575
5576        let request = RequestInstruments::new(
5577            None,
5578            None,
5579            Some(client_id),
5580            Some(*DYDX_VENUE),
5581            UUID4::new(),
5582            get_atomic_clock_realtime().get_time_ns(),
5583            None,
5584        );
5585
5586        assert!(client.request_instruments(&request).is_ok());
5587
5588        // Should handle type errors gracefully
5589        let timeout = tokio::time::Duration::from_secs(3);
5590        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5591            tokio::time::timeout(timeout, rx.recv()).await
5592        {
5593            // Type mismatch should result in parse failure and empty/partial response
5594            assert!(resp.correlation_id == request.request_id);
5595        }
5596    }
5597
5598    #[tokio::test]
5599    async fn test_unexpected_response_structure() {
5600        // Test handling when API response has completely unexpected structure
5601        use axum::{Json, Router, routing::get};
5602        use serde_json::{Value, json};
5603
5604        #[derive(Clone)]
5605        struct UnexpectedState;
5606
5607        async fn unexpected_structure_handler() -> Json<Value> {
5608            // Completely different structure than expected
5609            Json(json!({
5610                "error": "Something went wrong",
5611                "code": 500,
5612                "data": null,
5613                "unexpected_field": {
5614                    "nested": {
5615                        "deeply": [1, 2, 3]
5616                    }
5617                }
5618            }))
5619        }
5620
5621        let app = Router::new()
5622            .route("/v4/markets", get(unexpected_structure_handler))
5623            .with_state(UnexpectedState);
5624
5625        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5626        let server_addr = listener.local_addr().unwrap();
5627        let port = server_addr.port();
5628
5629        tokio::spawn(async move {
5630            axum::serve(listener, app).await.unwrap();
5631        });
5632
5633        wait_for_server(server_addr).await;
5634
5635        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5636        set_data_event_sender(sender);
5637
5638        let client_id = ClientId::from("DYDX-STRUCT");
5639        let config = DydxDataClientConfig {
5640            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5641            http_timeout_secs: Some(2),
5642            ..Default::default()
5643        };
5644
5645        let http_client = DydxHttpClient::new(
5646            config.base_url_http.clone(),
5647            config.http_timeout_secs,
5648            config.http_proxy_url.clone(),
5649            config.is_testnet,
5650            None,
5651        )
5652        .unwrap();
5653
5654        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5655
5656        let request = RequestInstruments::new(
5657            None,
5658            None,
5659            Some(client_id),
5660            Some(*DYDX_VENUE),
5661            UUID4::new(),
5662            get_atomic_clock_realtime().get_time_ns(),
5663            None,
5664        );
5665
5666        assert!(client.request_instruments(&request).is_ok());
5667
5668        // Should handle unexpected structure gracefully with empty response
5669        let timeout = tokio::time::Duration::from_secs(3);
5670        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5671            tokio::time::timeout(timeout, rx.recv()).await
5672        {
5673            assert!(
5674                resp.data.is_empty(),
5675                "Expected empty response on unexpected structure"
5676            );
5677            assert!(resp.correlation_id == request.request_id);
5678        }
5679    }
5680
5681    #[tokio::test]
5682    async fn test_empty_markets_object_in_response() {
5683        // Test handling when markets object is empty (valid JSON but no data)
5684        use axum::{Json, Router, routing::get};
5685        use serde_json::{Value, json};
5686
5687        #[derive(Clone)]
5688        struct EmptyMarketsState;
5689
5690        async fn empty_markets_handler() -> Json<Value> {
5691            Json(json!({
5692                "markets": {}
5693            }))
5694        }
5695
5696        let app = Router::new()
5697            .route("/v4/markets", get(empty_markets_handler))
5698            .with_state(EmptyMarketsState);
5699
5700        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5701        let server_addr = listener.local_addr().unwrap();
5702        let port = server_addr.port();
5703
5704        tokio::spawn(async move {
5705            axum::serve(listener, app).await.unwrap();
5706        });
5707
5708        wait_for_server(server_addr).await;
5709
5710        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5711        set_data_event_sender(sender);
5712
5713        let client_id = ClientId::from("DYDX-EMPTY");
5714        let config = DydxDataClientConfig {
5715            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5716            http_timeout_secs: Some(2),
5717            ..Default::default()
5718        };
5719
5720        let http_client = DydxHttpClient::new(
5721            config.base_url_http.clone(),
5722            config.http_timeout_secs,
5723            config.http_proxy_url.clone(),
5724            config.is_testnet,
5725            None,
5726        )
5727        .unwrap();
5728
5729        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5730
5731        let request = RequestInstruments::new(
5732            None,
5733            None,
5734            Some(client_id),
5735            Some(*DYDX_VENUE),
5736            UUID4::new(),
5737            get_atomic_clock_realtime().get_time_ns(),
5738            None,
5739        );
5740
5741        assert!(client.request_instruments(&request).is_ok());
5742
5743        // Should handle empty markets gracefully
5744        let timeout = tokio::time::Duration::from_secs(3);
5745        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5746            tokio::time::timeout(timeout, rx.recv()).await
5747        {
5748            assert!(
5749                resp.data.is_empty(),
5750                "Expected empty response for empty markets"
5751            );
5752            assert!(resp.correlation_id == request.request_id);
5753        }
5754    }
5755
5756    #[tokio::test]
5757    async fn test_null_values_in_response() {
5758        // Test handling of null values in critical fields
5759        use axum::{Json, Router, routing::get};
5760        use serde_json::{Value, json};
5761
5762        #[derive(Clone)]
5763        struct NullValuesState;
5764
5765        async fn null_values_handler() -> Json<Value> {
5766            Json(json!({
5767                "markets": {
5768                    "BTC-USD": {
5769                        "ticker": null,
5770                        "status": "ACTIVE",
5771                        "baseAsset": null,
5772                        "quoteAsset": "USD",
5773                        "stepSize": null,
5774                    }
5775                }
5776            }))
5777        }
5778
5779        let app = Router::new()
5780            .route("/v4/markets", get(null_values_handler))
5781            .with_state(NullValuesState);
5782
5783        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5784        let server_addr = listener.local_addr().unwrap();
5785        let port = server_addr.port();
5786
5787        tokio::spawn(async move {
5788            axum::serve(listener, app).await.unwrap();
5789        });
5790
5791        wait_for_server(server_addr).await;
5792
5793        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5794        set_data_event_sender(sender);
5795
5796        let client_id = ClientId::from("DYDX-NULL");
5797        let config = DydxDataClientConfig {
5798            base_url_http: Some(format!("http://127.0.0.1:{port}")),
5799            http_timeout_secs: Some(2),
5800            ..Default::default()
5801        };
5802
5803        let http_client = DydxHttpClient::new(
5804            config.base_url_http.clone(),
5805            config.http_timeout_secs,
5806            config.http_proxy_url.clone(),
5807            config.is_testnet,
5808            None,
5809        )
5810        .unwrap();
5811
5812        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5813
5814        let request = RequestInstruments::new(
5815            None,
5816            None,
5817            Some(client_id),
5818            Some(*DYDX_VENUE),
5819            UUID4::new(),
5820            get_atomic_clock_realtime().get_time_ns(),
5821            None,
5822        );
5823
5824        assert!(client.request_instruments(&request).is_ok());
5825
5826        // Should handle null values gracefully
5827        let timeout = tokio::time::Duration::from_secs(3);
5828        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5829            tokio::time::timeout(timeout, rx.recv()).await
5830        {
5831            // Null values should cause parse failures and result in empty/partial response
5832            assert!(resp.correlation_id == request.request_id);
5833        }
5834    }
5835
5836    #[tokio::test]
5837    async fn test_invalid_instrument_id_format() {
5838        // Test handling of non-existent instrument (valid ID format but doesn't exist)
5839        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5840        set_data_event_sender(sender);
5841
5842        let client_id = ClientId::from("DYDX-INVALID-ID");
5843        let config = DydxDataClientConfig::default();
5844
5845        let http_client = DydxHttpClient::new(
5846            config.base_url_http.clone(),
5847            config.http_timeout_secs,
5848            config.http_proxy_url.clone(),
5849            config.is_testnet,
5850            None,
5851        )
5852        .unwrap();
5853
5854        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5855
5856        // Valid format but non-existent instrument
5857        let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5858
5859        let request = RequestInstrument::new(
5860            non_existent_id,
5861            None,
5862            None,
5863            Some(client_id),
5864            UUID4::new(),
5865            get_atomic_clock_realtime().get_time_ns(),
5866            None,
5867        );
5868
5869        assert!(client.request_instrument(&request).is_ok());
5870
5871        // Should handle non-existent instrument gracefully
5872        let timeout = tokio::time::Duration::from_secs(2);
5873        let result = tokio::time::timeout(timeout, rx.recv()).await;
5874
5875        // Either no response or empty response is acceptable for non-existent instrument
5876        match result {
5877            Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5878                // Empty response acceptable
5879            }
5880            Ok(None) | Err(_) => {
5881                // Timeout or no response also acceptable
5882            }
5883            _ => {}
5884        }
5885    }
5886
5887    #[tokio::test]
5888    async fn test_invalid_date_range_end_before_start() {
5889        // Test handling when end date is before start date
5890        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5891        set_data_event_sender(sender);
5892
5893        let client_id = ClientId::from("DYDX-DATE-RANGE");
5894        let config = DydxDataClientConfig::default();
5895
5896        let http_client = DydxHttpClient::new(
5897            config.base_url_http.clone(),
5898            config.http_timeout_secs,
5899            config.http_proxy_url.clone(),
5900            config.is_testnet,
5901            None,
5902        )
5903        .unwrap();
5904
5905        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5906
5907        let instrument = create_test_instrument_any();
5908        let instrument_id = instrument.id();
5909        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5910        client.instruments.insert(symbol_key, instrument);
5911
5912        // Invalid date range: end is before start
5913        let start = Utc::now();
5914        let end = start - chrono::Duration::hours(24); // End is 24 hours before start
5915
5916        let request = RequestTrades::new(
5917            instrument_id,
5918            Some(start),
5919            Some(end),
5920            None,
5921            Some(client_id),
5922            UUID4::new(),
5923            get_atomic_clock_realtime().get_time_ns(),
5924            None,
5925        );
5926
5927        assert!(client.request_trades(&request).is_ok());
5928
5929        // Should handle invalid range gracefully - may return empty or no response
5930        let timeout = tokio::time::Duration::from_secs(2);
5931        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5932            tokio::time::timeout(timeout, rx.recv()).await
5933        {
5934            // Empty response expected for invalid date range
5935            assert!(resp.correlation_id == request.request_id);
5936        }
5937    }
5938
5939    #[tokio::test]
5940    async fn test_negative_limit_value() {
5941        // Test handling of limit edge cases
5942        // Note: Rust's NonZeroUsize prevents negative/zero values at type level
5943        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5944        set_data_event_sender(sender);
5945
5946        let client_id = ClientId::from("DYDX-NEG-LIMIT");
5947        let config = DydxDataClientConfig::default();
5948
5949        let http_client = DydxHttpClient::new(
5950            config.base_url_http.clone(),
5951            config.http_timeout_secs,
5952            config.http_proxy_url.clone(),
5953            config.is_testnet,
5954            None,
5955        )
5956        .unwrap();
5957
5958        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5959
5960        let instrument = create_test_instrument_any();
5961        let instrument_id = instrument.id();
5962        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5963        client.instruments.insert(symbol_key, instrument);
5964
5965        // Minimum valid limit (1)
5966        let request = RequestTrades::new(
5967            instrument_id,
5968            None,
5969            None,
5970            std::num::NonZeroUsize::new(1), // Minimum valid value
5971            Some(client_id),
5972            UUID4::new(),
5973            get_atomic_clock_realtime().get_time_ns(),
5974            None,
5975        );
5976
5977        // Should not panic with minimum limit
5978        assert!(client.request_trades(&request).is_ok());
5979    }
5980
5981    #[tokio::test]
5982    async fn test_zero_limit_value() {
5983        // Test handling of no limit (None = use API default)
5984        // Note: NonZeroUsize type prevents actual zero, so None represents "no limit"
5985        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5986        set_data_event_sender(sender);
5987
5988        let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5989        let config = DydxDataClientConfig::default();
5990
5991        let http_client = DydxHttpClient::new(
5992            config.base_url_http.clone(),
5993            config.http_timeout_secs,
5994            config.http_proxy_url.clone(),
5995            config.is_testnet,
5996            None,
5997        )
5998        .unwrap();
5999
6000        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6001
6002        let instrument = create_test_instrument_any();
6003        let instrument_id = instrument.id();
6004        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6005        client.instruments.insert(symbol_key, instrument);
6006
6007        let request = RequestTrades::new(
6008            instrument_id,
6009            None,
6010            None,
6011            None, // No limit specified (None = use default)
6012            Some(client_id),
6013            UUID4::new(),
6014            get_atomic_clock_realtime().get_time_ns(),
6015            None,
6016        );
6017
6018        assert!(client.request_trades(&request).is_ok());
6019
6020        // Should handle None limit gracefully (uses API default)
6021        let timeout = tokio::time::Duration::from_secs(2);
6022        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6023            tokio::time::timeout(timeout, rx.recv()).await
6024        {
6025            assert!(resp.correlation_id == request.request_id);
6026        }
6027    }
6028
6029    #[tokio::test]
6030    async fn test_very_large_limit_value() {
6031        // Test handling of extremely large limit values (boundary testing)
6032        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6033        set_data_event_sender(sender);
6034
6035        let client_id = ClientId::from("DYDX-LARGE-LIMIT");
6036        let config = DydxDataClientConfig::default();
6037
6038        let http_client = DydxHttpClient::new(
6039            config.base_url_http.clone(),
6040            config.http_timeout_secs,
6041            config.http_proxy_url.clone(),
6042            config.is_testnet,
6043            None,
6044        )
6045        .unwrap();
6046
6047        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6048
6049        let instrument = create_test_instrument_any();
6050        let instrument_id = instrument.id();
6051        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6052        client.instruments.insert(symbol_key, instrument);
6053
6054        let request = RequestTrades::new(
6055            instrument_id,
6056            None,
6057            None,
6058            std::num::NonZeroUsize::new(1_000_000), // Very large limit
6059            Some(client_id),
6060            UUID4::new(),
6061            get_atomic_clock_realtime().get_time_ns(),
6062            None,
6063        );
6064
6065        // Should not panic with very large limit
6066        assert!(client.request_trades(&request).is_ok());
6067
6068        // Should handle large limit gracefully
6069        let timeout = tokio::time::Duration::from_secs(2);
6070        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6071            tokio::time::timeout(timeout, rx.recv()).await
6072        {
6073            assert!(resp.correlation_id == request.request_id);
6074        }
6075    }
6076
6077    #[tokio::test]
6078    async fn test_none_limit_uses_default() {
6079        // Test that None limit falls back to default behavior
6080        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6081        set_data_event_sender(sender);
6082
6083        let client_id = ClientId::from("DYDX-NONE-LIMIT");
6084        let config = DydxDataClientConfig::default();
6085
6086        let http_client = DydxHttpClient::new(
6087            config.base_url_http.clone(),
6088            config.http_timeout_secs,
6089            config.http_proxy_url.clone(),
6090            config.is_testnet,
6091            None,
6092        )
6093        .unwrap();
6094
6095        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6096
6097        let instrument = create_test_instrument_any();
6098        let instrument_id = instrument.id();
6099        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6100        client.instruments.insert(symbol_key, instrument);
6101
6102        let request = RequestTrades::new(
6103            instrument_id,
6104            None,
6105            None,
6106            None, // No limit specified
6107            Some(client_id),
6108            UUID4::new(),
6109            get_atomic_clock_realtime().get_time_ns(),
6110            None,
6111        );
6112
6113        // Should work fine with None limit (uses API default)
6114        assert!(client.request_trades(&request).is_ok());
6115
6116        let timeout = tokio::time::Duration::from_secs(2);
6117        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6118            tokio::time::timeout(timeout, rx.recv()).await
6119        {
6120            assert!(resp.correlation_id == request.request_id);
6121        }
6122    }
6123
6124    #[tokio::test]
6125    async fn test_validation_does_not_panic() {
6126        // Test that various validation edge cases don't cause panics
6127        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6128        set_data_event_sender(sender);
6129
6130        let client_id = ClientId::from("DYDX-VALIDATION");
6131        let config = DydxDataClientConfig::default();
6132
6133        let http_client = DydxHttpClient::new(
6134            config.base_url_http.clone(),
6135            config.http_timeout_secs,
6136            config.http_proxy_url.clone(),
6137            config.is_testnet,
6138            None,
6139        )
6140        .unwrap();
6141
6142        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6143
6144        let instrument = create_test_instrument_any();
6145        let instrument_id = instrument.id();
6146        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6147        client.instruments.insert(symbol_key, instrument);
6148
6149        // Test 1: Invalid instrument ID
6150        let invalid_id = InstrumentId::from("INVALID.WRONG");
6151        let req1 = RequestInstrument::new(
6152            invalid_id,
6153            None,
6154            None,
6155            Some(client_id),
6156            UUID4::new(),
6157            get_atomic_clock_realtime().get_time_ns(),
6158            None,
6159        );
6160        assert!(client.request_instrument(&req1).is_ok());
6161
6162        // Test 2: Invalid date range
6163        let start = Utc::now();
6164        let end = start - chrono::Duration::hours(1);
6165        let req2 = RequestTrades::new(
6166            instrument_id,
6167            Some(start),
6168            Some(end),
6169            None,
6170            Some(client_id),
6171            UUID4::new(),
6172            get_atomic_clock_realtime().get_time_ns(),
6173            None,
6174        );
6175        assert!(client.request_trades(&req2).is_ok());
6176
6177        // Test 3: Minimum limit (1)
6178        let req3 = RequestTrades::new(
6179            instrument_id,
6180            None,
6181            None,
6182            std::num::NonZeroUsize::new(1),
6183            Some(client_id),
6184            UUID4::new(),
6185            get_atomic_clock_realtime().get_time_ns(),
6186            None,
6187        );
6188        assert!(client.request_trades(&req3).is_ok());
6189
6190        // Test 4: Very large limit
6191        let req4 = RequestTrades::new(
6192            instrument_id,
6193            None,
6194            None,
6195            std::num::NonZeroUsize::new(usize::MAX),
6196            Some(client_id),
6197            UUID4::new(),
6198            get_atomic_clock_realtime().get_time_ns(),
6199            None,
6200        );
6201        assert!(client.request_trades(&req4).is_ok());
6202
6203        // All validation edge cases handled without panic
6204    }
6205
6206    #[tokio::test]
6207    async fn test_instruments_response_has_correct_venue() {
6208        // Verify InstrumentsResponse includes correct DYDX venue
6209        use axum::{Json, Router, routing::get};
6210        use serde_json::{Value, json};
6211
6212        #[derive(Clone)]
6213        struct VenueTestState;
6214
6215        async fn venue_handler() -> Json<Value> {
6216            Json(json!({
6217                "markets": {
6218                    "BTC-USD": {
6219                        "ticker": "BTC-USD",
6220                        "status": "ACTIVE",
6221                        "baseAsset": "BTC",
6222                        "quoteAsset": "USD",
6223                        "stepSize": "0.0001",
6224                        "tickSize": "1",
6225                        "indexPrice": "50000",
6226                        "oraclePrice": "50000",
6227                        "priceChange24H": "1000",
6228                        "nextFundingRate": "0.0001",
6229                        "nextFundingAt": "2024-01-01T00:00:00.000Z",
6230                        "minOrderSize": "0.001",
6231                        "type": "PERPETUAL",
6232                        "initialMarginFraction": "0.05",
6233                        "maintenanceMarginFraction": "0.03",
6234                        "volume24H": "1000000",
6235                        "trades24H": "10000",
6236                        "openInterest": "5000000",
6237                        "incrementalInitialMarginFraction": "0.01",
6238                        "incrementalPositionSize": "10",
6239                        "maxPositionSize": "1000",
6240                        "baselinePositionSize": "100",
6241                        "assetResolution": "10000000000"
6242                    }
6243                }
6244            }))
6245        }
6246
6247        let app = Router::new()
6248            .route("/v4/markets", get(venue_handler))
6249            .with_state(VenueTestState);
6250
6251        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
6252        let server_addr = listener.local_addr().unwrap();
6253        let port = server_addr.port();
6254
6255        tokio::spawn(async move {
6256            axum::serve(listener, app).await.unwrap();
6257        });
6258
6259        wait_for_server(server_addr).await;
6260
6261        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6262        set_data_event_sender(sender);
6263
6264        let client_id = ClientId::from("DYDX-VENUE-TEST");
6265        let config = DydxDataClientConfig {
6266            base_url_http: Some(format!("http://127.0.0.1:{port}")),
6267            http_timeout_secs: Some(2),
6268            ..Default::default()
6269        };
6270
6271        let http_client = DydxHttpClient::new(
6272            config.base_url_http.clone(),
6273            config.http_timeout_secs,
6274            config.http_proxy_url.clone(),
6275            config.is_testnet,
6276            None,
6277        )
6278        .unwrap();
6279
6280        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6281
6282        let request = RequestInstruments::new(
6283            None,
6284            None,
6285            Some(client_id),
6286            Some(*DYDX_VENUE),
6287            UUID4::new(),
6288            get_atomic_clock_realtime().get_time_ns(),
6289            None,
6290        );
6291
6292        assert!(client.request_instruments(&request).is_ok());
6293
6294        let timeout = tokio::time::Duration::from_secs(3);
6295        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6296            tokio::time::timeout(timeout, rx.recv()).await
6297        {
6298            // Verify venue is DYDX
6299            assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6300        }
6301    }
6302
6303    #[tokio::test]
6304    async fn test_instruments_response_contains_vec_instrument_any() {
6305        // Verify InstrumentsResponse contains Vec<InstrumentAny>
6306        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6307        set_data_event_sender(sender);
6308
6309        let client_id = ClientId::from("DYDX-VEC-TEST");
6310        let config = DydxDataClientConfig::default();
6311
6312        let http_client = DydxHttpClient::new(
6313            config.base_url_http.clone(),
6314            config.http_timeout_secs,
6315            config.http_proxy_url.clone(),
6316            config.is_testnet,
6317            None,
6318        )
6319        .unwrap();
6320
6321        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6322
6323        let request = RequestInstruments::new(
6324            None,
6325            None,
6326            Some(client_id),
6327            Some(*DYDX_VENUE),
6328            UUID4::new(),
6329            get_atomic_clock_realtime().get_time_ns(),
6330            None,
6331        );
6332
6333        assert!(client.request_instruments(&request).is_ok());
6334
6335        let timeout = tokio::time::Duration::from_secs(2);
6336        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6337            tokio::time::timeout(timeout, rx.recv()).await
6338        {
6339            // Verify data is Vec<InstrumentAny>
6340            assert!(
6341                resp.data.is_empty() || !resp.data.is_empty(),
6342                "data should be Vec<InstrumentAny>"
6343            );
6344        }
6345    }
6346
6347    #[tokio::test]
6348    async fn test_instruments_response_includes_correlation_id() {
6349        // Verify InstrumentsResponse includes correlation_id matching request_id
6350        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6351        set_data_event_sender(sender);
6352
6353        let client_id = ClientId::from("DYDX-CORR-TEST");
6354        let config = DydxDataClientConfig::default();
6355
6356        let http_client = DydxHttpClient::new(
6357            config.base_url_http.clone(),
6358            config.http_timeout_secs,
6359            config.http_proxy_url.clone(),
6360            config.is_testnet,
6361            None,
6362        )
6363        .unwrap();
6364
6365        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6366
6367        let request_id = UUID4::new();
6368        let request = RequestInstruments::new(
6369            None,
6370            None,
6371            Some(client_id),
6372            Some(*DYDX_VENUE),
6373            request_id,
6374            get_atomic_clock_realtime().get_time_ns(),
6375            None,
6376        );
6377
6378        assert!(client.request_instruments(&request).is_ok());
6379
6380        let timeout = tokio::time::Duration::from_secs(2);
6381        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6382            tokio::time::timeout(timeout, rx.recv()).await
6383        {
6384            // Verify correlation_id matches request_id
6385            assert_eq!(
6386                resp.correlation_id, request_id,
6387                "correlation_id should match request_id"
6388            );
6389        }
6390    }
6391
6392    #[tokio::test]
6393    async fn test_instruments_response_includes_client_id() {
6394        // Verify InstrumentsResponse includes client_id
6395        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6396        set_data_event_sender(sender);
6397
6398        let client_id = ClientId::from("DYDX-CLIENT-TEST");
6399        let config = DydxDataClientConfig::default();
6400
6401        let http_client = DydxHttpClient::new(
6402            config.base_url_http.clone(),
6403            config.http_timeout_secs,
6404            config.http_proxy_url.clone(),
6405            config.is_testnet,
6406            None,
6407        )
6408        .unwrap();
6409
6410        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6411
6412        let request = RequestInstruments::new(
6413            None,
6414            None,
6415            Some(client_id),
6416            Some(*DYDX_VENUE),
6417            UUID4::new(),
6418            get_atomic_clock_realtime().get_time_ns(),
6419            None,
6420        );
6421
6422        assert!(client.request_instruments(&request).is_ok());
6423
6424        let timeout = tokio::time::Duration::from_secs(2);
6425        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6426            tokio::time::timeout(timeout, rx.recv()).await
6427        {
6428            // Verify client_id is included
6429            assert_eq!(
6430                resp.client_id, client_id,
6431                "client_id should be included in response"
6432            );
6433        }
6434    }
6435
6436    #[tokio::test]
6437    async fn test_instruments_response_includes_timestamps() {
6438        // Verify InstrumentsResponse includes start, end, and ts_init timestamps
6439        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6440        set_data_event_sender(sender);
6441
6442        let client_id = ClientId::from("DYDX-TS-TEST");
6443        let config = DydxDataClientConfig::default();
6444
6445        let http_client = DydxHttpClient::new(
6446            config.base_url_http.clone(),
6447            config.http_timeout_secs,
6448            config.http_proxy_url.clone(),
6449            config.is_testnet,
6450            None,
6451        )
6452        .unwrap();
6453
6454        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6455
6456        let start = Some(Utc::now() - chrono::Duration::days(1));
6457        let end = Some(Utc::now());
6458        let ts_init = get_atomic_clock_realtime().get_time_ns();
6459
6460        let request = RequestInstruments::new(
6461            start,
6462            end,
6463            Some(client_id),
6464            Some(*DYDX_VENUE),
6465            UUID4::new(),
6466            ts_init,
6467            None,
6468        );
6469
6470        assert!(client.request_instruments(&request).is_ok());
6471
6472        let timeout = tokio::time::Duration::from_secs(2);
6473        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6474            tokio::time::timeout(timeout, rx.recv()).await
6475        {
6476            // Verify timestamps are included
6477            assert!(
6478                resp.start.is_some() || resp.start.is_none(),
6479                "start timestamp field exists"
6480            );
6481            assert!(
6482                resp.end.is_some() || resp.end.is_none(),
6483                "end timestamp field exists"
6484            );
6485            assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6486        }
6487    }
6488
6489    #[tokio::test]
6490    async fn test_instruments_response_includes_params_when_provided() {
6491        // Verify InstrumentsResponse includes params when provided in request
6492        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6493        set_data_event_sender(sender);
6494
6495        let client_id = ClientId::from("DYDX-PARAMS-TEST");
6496        let config = DydxDataClientConfig::default();
6497
6498        let http_client = DydxHttpClient::new(
6499            config.base_url_http.clone(),
6500            config.http_timeout_secs,
6501            config.http_proxy_url.clone(),
6502            config.is_testnet,
6503            None,
6504        )
6505        .unwrap();
6506
6507        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6508
6509        // Since we can't easily create IndexMap in tests without importing,
6510        // just verify the params field exists by passing None
6511        let request = RequestInstruments::new(
6512            None,
6513            None,
6514            Some(client_id),
6515            Some(*DYDX_VENUE),
6516            UUID4::new(),
6517            get_atomic_clock_realtime().get_time_ns(),
6518            None, // params
6519        );
6520
6521        assert!(client.request_instruments(&request).is_ok());
6522
6523        let timeout = tokio::time::Duration::from_secs(2);
6524        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6525            tokio::time::timeout(timeout, rx.recv()).await
6526        {
6527            // Verify params field exists (structure validation)
6528            let _params = resp.params;
6529        }
6530    }
6531
6532    #[tokio::test]
6533    async fn test_instruments_response_params_none_when_not_provided() {
6534        // Verify InstrumentsResponse params is None when not provided
6535        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6536        set_data_event_sender(sender);
6537
6538        let client_id = ClientId::from("DYDX-NO-PARAMS");
6539        let config = DydxDataClientConfig::default();
6540
6541        let http_client = DydxHttpClient::new(
6542            config.base_url_http.clone(),
6543            config.http_timeout_secs,
6544            config.http_proxy_url.clone(),
6545            config.is_testnet,
6546            None,
6547        )
6548        .unwrap();
6549
6550        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6551
6552        let request = RequestInstruments::new(
6553            None,
6554            None,
6555            Some(client_id),
6556            Some(*DYDX_VENUE),
6557            UUID4::new(),
6558            get_atomic_clock_realtime().get_time_ns(),
6559            None, // No params
6560        );
6561
6562        assert!(client.request_instruments(&request).is_ok());
6563
6564        let timeout = tokio::time::Duration::from_secs(2);
6565        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6566            tokio::time::timeout(timeout, rx.recv()).await
6567        {
6568            // Verify params field exists and is None when not provided
6569            assert!(
6570                resp.params.is_none(),
6571                "params should be None when not provided"
6572            );
6573        }
6574    }
6575
6576    #[tokio::test]
6577    async fn test_instruments_response_complete_structure() {
6578        // Comprehensive test verifying all InstrumentsResponse fields
6579        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6580        set_data_event_sender(sender);
6581
6582        let client_id = ClientId::from("DYDX-FULL-TEST");
6583        let config = DydxDataClientConfig::default();
6584
6585        let http_client = DydxHttpClient::new(
6586            config.base_url_http.clone(),
6587            config.http_timeout_secs,
6588            config.http_proxy_url.clone(),
6589            config.is_testnet,
6590            None,
6591        )
6592        .unwrap();
6593
6594        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6595
6596        let request_id = UUID4::new();
6597        let start = Some(Utc::now() - chrono::Duration::hours(1));
6598        let end = Some(Utc::now());
6599        let ts_init = get_atomic_clock_realtime().get_time_ns();
6600
6601        let request = RequestInstruments::new(
6602            start,
6603            end,
6604            Some(client_id),
6605            Some(*DYDX_VENUE),
6606            request_id,
6607            ts_init,
6608            None,
6609        );
6610
6611        assert!(client.request_instruments(&request).is_ok());
6612
6613        let timeout = tokio::time::Duration::from_secs(2);
6614        if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6615            tokio::time::timeout(timeout, rx.recv()).await
6616        {
6617            // Comprehensive validation of all fields
6618            assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6619            assert_eq!(
6620                resp.correlation_id, request_id,
6621                "correlation_id should match"
6622            );
6623            assert_eq!(resp.client_id, client_id, "client_id should match");
6624            assert!(resp.ts_init > 0, "ts_init should be set");
6625
6626            // data field exists (Vec<InstrumentAny>)
6627            let _data: Vec<InstrumentAny> = resp.data;
6628
6629            // Timestamp fields can be present or None
6630            let _start = resp.start;
6631            let _end = resp.end;
6632            let _params = resp.params;
6633        }
6634    }
6635
6636    #[tokio::test]
6637    async fn test_instrument_response_properly_boxed() {
6638        // Verify InstrumentResponse is properly boxed in DataResponse
6639        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6640        set_data_event_sender(sender);
6641
6642        let client_id = ClientId::from("DYDX-BOXED-TEST");
6643        let config = DydxDataClientConfig::default();
6644
6645        let http_client = DydxHttpClient::new(
6646            config.base_url_http.clone(),
6647            config.http_timeout_secs,
6648            config.http_proxy_url.clone(),
6649            config.is_testnet,
6650            None,
6651        )
6652        .unwrap();
6653
6654        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6655
6656        let instrument = create_test_instrument_any();
6657        let instrument_id = instrument.id();
6658        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6659        client.instruments.insert(symbol_key, instrument);
6660
6661        let request = RequestInstrument::new(
6662            instrument_id,
6663            None,
6664            None,
6665            Some(client_id),
6666            UUID4::new(),
6667            get_atomic_clock_realtime().get_time_ns(),
6668            None,
6669        );
6670
6671        assert!(client.request_instrument(&request).is_ok());
6672
6673        let timeout = tokio::time::Duration::from_secs(2);
6674        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6675            tokio::time::timeout(timeout, rx.recv()).await
6676        {
6677            // Verify it's boxed - we receive Box<InstrumentResponse>
6678            let _response: Box<InstrumentResponse> = boxed_resp;
6679            // Successfully matched boxed pattern
6680        }
6681    }
6682
6683    #[tokio::test]
6684    async fn test_instrument_response_contains_single_instrument() {
6685        // Verify InstrumentResponse contains single InstrumentAny
6686        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6687        set_data_event_sender(sender);
6688
6689        let client_id = ClientId::from("DYDX-SINGLE-TEST");
6690        let config = DydxDataClientConfig::default();
6691
6692        let http_client = DydxHttpClient::new(
6693            config.base_url_http.clone(),
6694            config.http_timeout_secs,
6695            config.http_proxy_url.clone(),
6696            config.is_testnet,
6697            None,
6698        )
6699        .unwrap();
6700
6701        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6702
6703        let instrument = create_test_instrument_any();
6704        let instrument_id = instrument.id();
6705        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6706        client.instruments.insert(symbol_key, instrument.clone());
6707
6708        let request = RequestInstrument::new(
6709            instrument_id,
6710            None,
6711            None,
6712            Some(client_id),
6713            UUID4::new(),
6714            get_atomic_clock_realtime().get_time_ns(),
6715            None,
6716        );
6717
6718        assert!(client.request_instrument(&request).is_ok());
6719
6720        let timeout = tokio::time::Duration::from_secs(2);
6721        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6722            tokio::time::timeout(timeout, rx.recv()).await
6723        {
6724            // Verify data contains single InstrumentAny
6725            let _instrument: InstrumentAny = resp.data;
6726            // Successfully matched InstrumentAny type
6727        }
6728    }
6729
6730    #[tokio::test]
6731    async fn test_instrument_response_has_correct_instrument_id() {
6732        // Verify InstrumentResponse has correct instrument_id
6733        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6734        set_data_event_sender(sender);
6735
6736        let client_id = ClientId::from("DYDX-ID-TEST");
6737        let config = DydxDataClientConfig::default();
6738
6739        let http_client = DydxHttpClient::new(
6740            config.base_url_http.clone(),
6741            config.http_timeout_secs,
6742            config.http_proxy_url.clone(),
6743            config.is_testnet,
6744            None,
6745        )
6746        .unwrap();
6747
6748        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6749
6750        let instrument = create_test_instrument_any();
6751        let instrument_id = instrument.id();
6752        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6753        client.instruments.insert(symbol_key, instrument);
6754
6755        let request = RequestInstrument::new(
6756            instrument_id,
6757            None,
6758            None,
6759            Some(client_id),
6760            UUID4::new(),
6761            get_atomic_clock_realtime().get_time_ns(),
6762            None,
6763        );
6764
6765        assert!(client.request_instrument(&request).is_ok());
6766
6767        let timeout = tokio::time::Duration::from_secs(2);
6768        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6769            tokio::time::timeout(timeout, rx.recv()).await
6770        {
6771            // Verify instrument_id matches request
6772            assert_eq!(
6773                resp.instrument_id, instrument_id,
6774                "instrument_id should match requested ID"
6775            );
6776        }
6777    }
6778
6779    #[tokio::test]
6780    async fn test_instrument_response_includes_metadata() {
6781        // Verify InstrumentResponse includes all metadata fields
6782        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6783        set_data_event_sender(sender);
6784
6785        let client_id = ClientId::from("DYDX-META-TEST");
6786        let config = DydxDataClientConfig::default();
6787
6788        let http_client = DydxHttpClient::new(
6789            config.base_url_http.clone(),
6790            config.http_timeout_secs,
6791            config.http_proxy_url.clone(),
6792            config.is_testnet,
6793            None,
6794        )
6795        .unwrap();
6796
6797        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6798
6799        let instrument = create_test_instrument_any();
6800        let instrument_id = instrument.id();
6801        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6802        client.instruments.insert(symbol_key, instrument);
6803
6804        let request_id = UUID4::new();
6805        let start = Some(Utc::now() - chrono::Duration::hours(1));
6806        let end = Some(Utc::now());
6807        let ts_init = get_atomic_clock_realtime().get_time_ns();
6808
6809        let request = RequestInstrument::new(
6810            instrument_id,
6811            start,
6812            end,
6813            Some(client_id),
6814            request_id,
6815            ts_init,
6816            None,
6817        );
6818
6819        assert!(client.request_instrument(&request).is_ok());
6820
6821        let timeout = tokio::time::Duration::from_secs(2);
6822        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6823            tokio::time::timeout(timeout, rx.recv()).await
6824        {
6825            // Verify all metadata fields
6826            assert_eq!(
6827                resp.correlation_id, request_id,
6828                "correlation_id should match"
6829            );
6830            assert_eq!(resp.client_id, client_id, "client_id should match");
6831            assert!(resp.ts_init > 0, "ts_init should be set");
6832
6833            // Timestamp fields exist (can be Some or None)
6834            let _start = resp.start;
6835            let _end = resp.end;
6836
6837            // Params field exists
6838            let _params = resp.params;
6839        }
6840    }
6841
6842    #[tokio::test]
6843    async fn test_instrument_response_matches_requested_instrument() {
6844        // Verify InstrumentResponse data matches the requested instrument exactly
6845        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6846        set_data_event_sender(sender);
6847
6848        let client_id = ClientId::from("DYDX-MATCH-TEST");
6849        let config = DydxDataClientConfig::default();
6850
6851        let http_client = DydxHttpClient::new(
6852            config.base_url_http.clone(),
6853            config.http_timeout_secs,
6854            config.http_proxy_url.clone(),
6855            config.is_testnet,
6856            None,
6857        )
6858        .unwrap();
6859
6860        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6861
6862        let instrument = create_test_instrument_any();
6863        let instrument_id = instrument.id();
6864        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6865        client.instruments.insert(symbol_key, instrument.clone());
6866
6867        let request = RequestInstrument::new(
6868            instrument_id,
6869            None,
6870            None,
6871            Some(client_id),
6872            UUID4::new(),
6873            get_atomic_clock_realtime().get_time_ns(),
6874            None,
6875        );
6876
6877        assert!(client.request_instrument(&request).is_ok());
6878
6879        let timeout = tokio::time::Duration::from_secs(2);
6880        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6881            tokio::time::timeout(timeout, rx.recv()).await
6882        {
6883            // Verify returned instrument matches requested instrument
6884            assert_eq!(
6885                resp.data.id(),
6886                instrument_id,
6887                "Returned instrument should match requested"
6888            );
6889            assert_eq!(
6890                resp.instrument_id, instrument_id,
6891                "instrument_id field should match"
6892            );
6893
6894            // Both should point to the same instrument
6895            assert_eq!(
6896                resp.data.id(),
6897                resp.instrument_id,
6898                "data.id() should match instrument_id field"
6899            );
6900        }
6901    }
6902
6903    #[tokio::test]
6904    async fn test_instrument_response_complete_structure() {
6905        // Comprehensive test verifying all InstrumentResponse fields
6906        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6907        set_data_event_sender(sender);
6908
6909        let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6910        let config = DydxDataClientConfig::default();
6911
6912        let http_client = DydxHttpClient::new(
6913            config.base_url_http.clone(),
6914            config.http_timeout_secs,
6915            config.http_proxy_url.clone(),
6916            config.is_testnet,
6917            None,
6918        )
6919        .unwrap();
6920
6921        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6922
6923        let instrument = create_test_instrument_any();
6924        let instrument_id = instrument.id();
6925        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6926        client.instruments.insert(symbol_key, instrument.clone());
6927
6928        let request_id = UUID4::new();
6929        let ts_init = get_atomic_clock_realtime().get_time_ns();
6930
6931        let request = RequestInstrument::new(
6932            instrument_id,
6933            None,
6934            None,
6935            Some(client_id),
6936            request_id,
6937            ts_init,
6938            None,
6939        );
6940
6941        assert!(client.request_instrument(&request).is_ok());
6942
6943        let timeout = tokio::time::Duration::from_secs(2);
6944        if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6945            tokio::time::timeout(timeout, rx.recv()).await
6946        {
6947            // Comprehensive validation
6948            // 1. Boxed structure
6949            let _boxed: Box<InstrumentResponse> = resp.clone();
6950
6951            // 2. All required fields present
6952            assert_eq!(resp.correlation_id, request_id);
6953            assert_eq!(resp.client_id, client_id);
6954            assert_eq!(resp.instrument_id, instrument_id);
6955            assert!(resp.ts_init > 0);
6956
6957            // 3. Data field contains InstrumentAny
6958            let returned_instrument: InstrumentAny = resp.data;
6959            assert_eq!(returned_instrument.id(), instrument_id);
6960
6961            // 4. Optional fields exist
6962            let _start = resp.start;
6963            let _end = resp.end;
6964            let _params = resp.params;
6965        }
6966    }
6967
6968    #[tokio::test]
6969    async fn test_trades_response_contains_vec_trade_tick() {
6970        // Verify TradesResponse.data is Vec<TradeTick>
6971        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6972        set_data_event_sender(sender);
6973
6974        let created_at = Utc::now();
6975        let http_trades = vec![
6976            crate::http::models::Trade {
6977                id: "trade-1".to_string(),
6978                side: OrderSide::Buy,
6979                size: dec!(1.0),
6980                price: dec!(100.0),
6981                created_at,
6982                created_at_height: 100,
6983                trade_type: crate::common::enums::DydxTradeType::Limit,
6984            },
6985            crate::http::models::Trade {
6986                id: "trade-2".to_string(),
6987                side: OrderSide::Sell,
6988                size: dec!(2.0),
6989                price: dec!(101.0),
6990                created_at: created_at + chrono::Duration::seconds(1),
6991                created_at_height: 101,
6992                trade_type: crate::common::enums::DydxTradeType::Limit,
6993            },
6994        ];
6995
6996        let trades_response = crate::http::models::TradesResponse {
6997            trades: http_trades,
6998        };
6999
7000        let state = TradesTestState {
7001            response: Arc::new(trades_response),
7002            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7003            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7004        };
7005
7006        let addr = start_trades_test_server(state).await;
7007        let base_url = format!("http://{addr}");
7008
7009        let client_id = ClientId::from("DYDX-VEC-TEST");
7010        let config = DydxDataClientConfig {
7011            base_url_http: Some(base_url),
7012            is_testnet: true,
7013            ..Default::default()
7014        };
7015
7016        let http_client = DydxHttpClient::new(
7017            config.base_url_http.clone(),
7018            config.http_timeout_secs,
7019            config.http_proxy_url.clone(),
7020            config.is_testnet,
7021            None,
7022        )
7023        .unwrap();
7024
7025        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7026
7027        let instrument = create_test_instrument_any();
7028        let instrument_id = instrument.id();
7029        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7030        client.instruments.insert(symbol_key, instrument);
7031
7032        let request = RequestTrades::new(
7033            instrument_id,
7034            None,
7035            None,
7036            None,
7037            Some(client_id),
7038            UUID4::new(),
7039            get_atomic_clock_realtime().get_time_ns(),
7040            None,
7041        );
7042
7043        assert!(client.request_trades(&request).is_ok());
7044
7045        let timeout = tokio::time::Duration::from_millis(500);
7046        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7047            tokio::time::timeout(timeout, rx.recv()).await
7048        {
7049            // Verify data is Vec<TradeTick>
7050            let trade_ticks: Vec<TradeTick> = resp.data;
7051            assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7052
7053            // Each element is a TradeTick
7054            for tick in &trade_ticks {
7055                assert_eq!(tick.instrument_id, instrument_id);
7056            }
7057        }
7058    }
7059
7060    #[tokio::test]
7061    async fn test_trades_response_has_correct_instrument_id() {
7062        // Verify TradesResponse.instrument_id matches request
7063        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7064        set_data_event_sender(sender);
7065
7066        let created_at = Utc::now();
7067        let http_trade = crate::http::models::Trade {
7068            id: "instrument-id-test".to_string(),
7069            side: OrderSide::Buy,
7070            size: dec!(1.0),
7071            price: dec!(100.0),
7072            created_at,
7073            created_at_height: 100,
7074            trade_type: crate::common::enums::DydxTradeType::Limit,
7075        };
7076
7077        let trades_response = crate::http::models::TradesResponse {
7078            trades: vec![http_trade],
7079        };
7080
7081        let state = TradesTestState {
7082            response: Arc::new(trades_response),
7083            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7084            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7085        };
7086
7087        let addr = start_trades_test_server(state).await;
7088        let base_url = format!("http://{addr}");
7089
7090        let client_id = ClientId::from("DYDX-INSTID-TEST");
7091        let config = DydxDataClientConfig {
7092            base_url_http: Some(base_url),
7093            is_testnet: true,
7094            ..Default::default()
7095        };
7096
7097        let http_client = DydxHttpClient::new(
7098            config.base_url_http.clone(),
7099            config.http_timeout_secs,
7100            config.http_proxy_url.clone(),
7101            config.is_testnet,
7102            None,
7103        )
7104        .unwrap();
7105
7106        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7107
7108        let instrument = create_test_instrument_any();
7109        let instrument_id = instrument.id();
7110        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7111        client.instruments.insert(symbol_key, instrument);
7112
7113        let request = RequestTrades::new(
7114            instrument_id,
7115            None,
7116            None,
7117            None,
7118            Some(client_id),
7119            UUID4::new(),
7120            get_atomic_clock_realtime().get_time_ns(),
7121            None,
7122        );
7123
7124        assert!(client.request_trades(&request).is_ok());
7125
7126        let timeout = tokio::time::Duration::from_millis(500);
7127        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7128            tokio::time::timeout(timeout, rx.recv()).await
7129        {
7130            // Verify instrument_id field matches request
7131            assert_eq!(
7132                resp.instrument_id, instrument_id,
7133                "TradesResponse.instrument_id should match request"
7134            );
7135
7136            // Verify all trade ticks have the same instrument_id
7137            for tick in &resp.data {
7138                assert_eq!(
7139                    tick.instrument_id, instrument_id,
7140                    "Each TradeTick should have correct instrument_id"
7141                );
7142            }
7143        }
7144    }
7145
7146    #[tokio::test]
7147    async fn test_trades_response_properly_ordered_by_timestamp() {
7148        // Verify trades are ordered by timestamp (ascending)
7149        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7150        set_data_event_sender(sender);
7151
7152        let base_time = Utc::now();
7153        let http_trades = vec![
7154            crate::http::models::Trade {
7155                id: "trade-oldest".to_string(),
7156                side: OrderSide::Buy,
7157                size: dec!(1.0),
7158                price: dec!(100.0),
7159                created_at: base_time,
7160                created_at_height: 100,
7161                trade_type: crate::common::enums::DydxTradeType::Limit,
7162            },
7163            crate::http::models::Trade {
7164                id: "trade-middle".to_string(),
7165                side: OrderSide::Sell,
7166                size: dec!(2.0),
7167                price: dec!(101.0),
7168                created_at: base_time + chrono::Duration::seconds(1),
7169                created_at_height: 101,
7170                trade_type: crate::common::enums::DydxTradeType::Limit,
7171            },
7172            crate::http::models::Trade {
7173                id: "trade-newest".to_string(),
7174                side: OrderSide::Buy,
7175                size: dec!(3.0),
7176                price: dec!(102.0),
7177                created_at: base_time + chrono::Duration::seconds(2),
7178                created_at_height: 102,
7179                trade_type: crate::common::enums::DydxTradeType::Limit,
7180            },
7181        ];
7182
7183        let trades_response = crate::http::models::TradesResponse {
7184            trades: http_trades,
7185        };
7186
7187        let state = TradesTestState {
7188            response: Arc::new(trades_response),
7189            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7190            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7191        };
7192
7193        let addr = start_trades_test_server(state).await;
7194        let base_url = format!("http://{addr}");
7195
7196        let client_id = ClientId::from("DYDX-ORDER-TEST");
7197        let config = DydxDataClientConfig {
7198            base_url_http: Some(base_url),
7199            is_testnet: true,
7200            ..Default::default()
7201        };
7202
7203        let http_client = DydxHttpClient::new(
7204            config.base_url_http.clone(),
7205            config.http_timeout_secs,
7206            config.http_proxy_url.clone(),
7207            config.is_testnet,
7208            None,
7209        )
7210        .unwrap();
7211
7212        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7213
7214        let instrument = create_test_instrument_any();
7215        let instrument_id = instrument.id();
7216        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7217        client.instruments.insert(symbol_key, instrument);
7218
7219        let request = RequestTrades::new(
7220            instrument_id,
7221            None,
7222            None,
7223            None,
7224            Some(client_id),
7225            UUID4::new(),
7226            get_atomic_clock_realtime().get_time_ns(),
7227            None,
7228        );
7229
7230        assert!(client.request_trades(&request).is_ok());
7231
7232        let timeout = tokio::time::Duration::from_millis(500);
7233        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7234            tokio::time::timeout(timeout, rx.recv()).await
7235        {
7236            // Verify trades are ordered by timestamp
7237            let trade_ticks = resp.data;
7238            assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7239
7240            // Check ascending timestamp order
7241            for i in 1..trade_ticks.len() {
7242                assert!(
7243                    trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7244                    "Trades should be ordered by timestamp (ts_event) in ascending order"
7245                );
7246            }
7247
7248            // Verify specific ordering
7249            assert!(
7250                trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7251                "First trade should be before second"
7252            );
7253            assert!(
7254                trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7255                "Second trade should be before third"
7256            );
7257        }
7258    }
7259
7260    #[tokio::test]
7261    async fn test_trades_response_all_trade_tick_fields_populated() {
7262        // Verify all TradeTick fields are properly populated
7263        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7264        set_data_event_sender(sender);
7265
7266        let created_at = Utc::now();
7267        let http_trade = crate::http::models::Trade {
7268            id: "field-test".to_string(),
7269            side: OrderSide::Buy,
7270            size: dec!(5.5),
7271            price: dec!(12345.67),
7272            created_at,
7273            created_at_height: 999,
7274            trade_type: crate::common::enums::DydxTradeType::Limit,
7275        };
7276
7277        let trades_response = crate::http::models::TradesResponse {
7278            trades: vec![http_trade],
7279        };
7280
7281        let state = TradesTestState {
7282            response: Arc::new(trades_response),
7283            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7284            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7285        };
7286
7287        let addr = start_trades_test_server(state).await;
7288        let base_url = format!("http://{addr}");
7289
7290        let client_id = ClientId::from("DYDX-FIELDS-TEST");
7291        let config = DydxDataClientConfig {
7292            base_url_http: Some(base_url),
7293            is_testnet: true,
7294            ..Default::default()
7295        };
7296
7297        let http_client = DydxHttpClient::new(
7298            config.base_url_http.clone(),
7299            config.http_timeout_secs,
7300            config.http_proxy_url.clone(),
7301            config.is_testnet,
7302            None,
7303        )
7304        .unwrap();
7305
7306        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7307
7308        let instrument = create_test_instrument_any();
7309        let instrument_id = instrument.id();
7310        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7311        client.instruments.insert(symbol_key, instrument);
7312
7313        let request = RequestTrades::new(
7314            instrument_id,
7315            None,
7316            None,
7317            None,
7318            Some(client_id),
7319            UUID4::new(),
7320            get_atomic_clock_realtime().get_time_ns(),
7321            None,
7322        );
7323
7324        assert!(client.request_trades(&request).is_ok());
7325
7326        let timeout = tokio::time::Duration::from_millis(500);
7327        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7328            tokio::time::timeout(timeout, rx.recv()).await
7329        {
7330            assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7331            let tick = &resp.data[0];
7332
7333            // Verify all TradeTick fields are properly populated
7334            assert_eq!(
7335                tick.instrument_id, instrument_id,
7336                "instrument_id should be set"
7337            );
7338            assert!(tick.price.as_f64() > 0.0, "price should be positive");
7339            assert!(tick.size.as_f64() > 0.0, "size should be positive");
7340
7341            // Verify aggressor_side is set (Buy or Sell)
7342            match tick.aggressor_side {
7343                AggressorSide::Buyer | AggressorSide::Seller => {
7344                    // Valid aggressor side
7345                }
7346                AggressorSide::NoAggressor => {
7347                    panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7348                }
7349            }
7350
7351            // Verify trade_id is set
7352            assert!(
7353                !tick.trade_id.to_string().is_empty(),
7354                "trade_id should be set"
7355            );
7356
7357            // Verify timestamps are set and valid
7358            assert!(tick.ts_event > 0, "ts_event should be set");
7359            assert!(tick.ts_init > 0, "ts_init should be set");
7360            assert!(
7361                tick.ts_init >= tick.ts_event,
7362                "ts_init should be >= ts_event"
7363            );
7364        }
7365    }
7366
7367    #[tokio::test]
7368    async fn test_trades_response_includes_metadata() {
7369        // Verify TradesResponse includes all metadata fields
7370        let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7371        set_data_event_sender(sender);
7372
7373        let created_at = Utc::now();
7374        let http_trade = crate::http::models::Trade {
7375            id: "metadata-test".to_string(),
7376            side: OrderSide::Buy,
7377            size: dec!(1.0),
7378            price: dec!(100.0),
7379            created_at,
7380            created_at_height: 100,
7381            trade_type: crate::common::enums::DydxTradeType::Limit,
7382        };
7383
7384        let trades_response = crate::http::models::TradesResponse {
7385            trades: vec![http_trade],
7386        };
7387
7388        let state = TradesTestState {
7389            response: Arc::new(trades_response),
7390            last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7391            last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7392        };
7393
7394        let addr = start_trades_test_server(state).await;
7395        let base_url = format!("http://{addr}");
7396
7397        let client_id = ClientId::from("DYDX-META-TEST");
7398        let config = DydxDataClientConfig {
7399            base_url_http: Some(base_url),
7400            is_testnet: true,
7401            ..Default::default()
7402        };
7403
7404        let http_client = DydxHttpClient::new(
7405            config.base_url_http.clone(),
7406            config.http_timeout_secs,
7407            config.http_proxy_url.clone(),
7408            config.is_testnet,
7409            None,
7410        )
7411        .unwrap();
7412
7413        let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7414
7415        let instrument = create_test_instrument_any();
7416        let instrument_id = instrument.id();
7417        let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7418        client.instruments.insert(symbol_key, instrument);
7419
7420        let request_id = UUID4::new();
7421        let request = RequestTrades::new(
7422            instrument_id,
7423            None,
7424            None,
7425            None,
7426            Some(client_id),
7427            request_id,
7428            get_atomic_clock_realtime().get_time_ns(),
7429            None,
7430        );
7431
7432        assert!(client.request_trades(&request).is_ok());
7433
7434        let timeout = tokio::time::Duration::from_millis(500);
7435        if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7436            tokio::time::timeout(timeout, rx.recv()).await
7437        {
7438            // Verify metadata fields
7439            assert_eq!(
7440                resp.correlation_id, request_id,
7441                "correlation_id should match request"
7442            );
7443            assert_eq!(resp.client_id, client_id, "client_id should be set");
7444            assert_eq!(
7445                resp.instrument_id, instrument_id,
7446                "instrument_id should be set"
7447            );
7448            assert!(resp.ts_init > 0, "ts_init should be set");
7449
7450            let _start = resp.start;
7451            let _end = resp.end;
7452            let _params = resp.params;
7453        }
7454    }
7455
7456    #[tokio::test]
7457    async fn test_orderbook_cache_growth_with_many_instruments() {
7458        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7459        set_data_event_sender(sender);
7460
7461        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7462        let config = DydxDataClientConfig {
7463            base_url_http: Some(base_url),
7464            is_testnet: true,
7465            ..Default::default()
7466        };
7467
7468        let http_client = DydxHttpClient::new(
7469            config.base_url_http.clone(),
7470            config.http_timeout_secs,
7471            config.http_proxy_url.clone(),
7472            config.is_testnet,
7473            None,
7474        )
7475        .unwrap();
7476
7477        let client =
7478            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7479
7480        let initial_capacity = client.order_books.capacity();
7481
7482        for i in 0..100 {
7483            let symbol = format!("INSTRUMENT-{i}");
7484            let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7485            client.order_books.insert(
7486                instrument_id,
7487                OrderBook::new(instrument_id, BookType::L2_MBP),
7488            );
7489        }
7490
7491        assert_eq!(client.order_books.len(), 100);
7492        assert!(client.order_books.capacity() >= initial_capacity);
7493
7494        client.order_books.clear();
7495        assert_eq!(client.order_books.len(), 0);
7496    }
7497
7498    #[rstest]
7499    fn test_instrument_id_validation_rejects_invalid_formats() {
7500        // InstrumentId::from() validates format and panics on invalid input
7501        let test_cases = vec![
7502            ("", "Empty string missing separator"),
7503            ("INVALID", "No venue separator"),
7504            ("NO-VENUE", "No venue separator"),
7505            (".DYDX", "Empty symbol"),
7506            ("SYMBOL.", "Empty venue"),
7507        ];
7508
7509        for (invalid_id, description) in test_cases {
7510            let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7511            assert!(
7512                result.is_err(),
7513                "Expected {invalid_id} to panic: {description}"
7514            );
7515        }
7516    }
7517
7518    #[rstest]
7519    fn test_instrument_id_validation_accepts_valid_formats() {
7520        let valid_ids = vec![
7521            "BTC-USD-PERP.DYDX",
7522            "ETH-USD-PERP.DYDX",
7523            "SOL-USD.DYDX",
7524            "AVAX-USD-PERP.DYDX",
7525        ];
7526
7527        for valid_id in valid_ids {
7528            let instrument_id = InstrumentId::from(valid_id);
7529            assert!(
7530                !instrument_id.symbol.as_str().is_empty()
7531                    && !instrument_id.venue.as_str().is_empty(),
7532                "Expected {valid_id} to have non-empty symbol and venue"
7533            );
7534        }
7535    }
7536
7537    #[tokio::test]
7538    async fn test_request_bars_with_inverted_date_range() {
7539        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7540        set_data_event_sender(sender);
7541
7542        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7543        let config = DydxDataClientConfig {
7544            base_url_http: Some(base_url),
7545            is_testnet: true,
7546            ..Default::default()
7547        };
7548
7549        let http_client = DydxHttpClient::new(
7550            config.base_url_http.clone(),
7551            config.http_timeout_secs,
7552            config.http_proxy_url.clone(),
7553            config.is_testnet,
7554            None,
7555        )
7556        .unwrap();
7557
7558        let client =
7559            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7560
7561        let instrument = create_test_instrument_any();
7562        let instrument_id = instrument.id();
7563        client
7564            .instruments
7565            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7566
7567        let spec = BarSpecification {
7568            step: std::num::NonZeroUsize::new(1).unwrap(),
7569            aggregation: BarAggregation::Minute,
7570            price_type: PriceType::Last,
7571        };
7572        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7573
7574        let now = Utc::now();
7575        let start = Some(now);
7576        let end = Some(now - chrono::Duration::hours(1));
7577
7578        let request = RequestBars::new(
7579            bar_type,
7580            start,
7581            end,
7582            None,
7583            Some(ClientId::from("dydx_test")),
7584            UUID4::new(),
7585            get_atomic_clock_realtime().get_time_ns(),
7586            None,
7587        );
7588
7589        let result = client.request_bars(&request);
7590        assert!(result.is_ok());
7591    }
7592
7593    #[tokio::test]
7594    async fn test_request_bars_with_zero_limit() {
7595        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7596        set_data_event_sender(sender);
7597
7598        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7599        let config = DydxDataClientConfig {
7600            base_url_http: Some(base_url),
7601            is_testnet: true,
7602            ..Default::default()
7603        };
7604
7605        let http_client = DydxHttpClient::new(
7606            config.base_url_http.clone(),
7607            config.http_timeout_secs,
7608            config.http_proxy_url.clone(),
7609            config.is_testnet,
7610            None,
7611        )
7612        .unwrap();
7613
7614        let client =
7615            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7616
7617        let instrument = create_test_instrument_any();
7618        let instrument_id = instrument.id();
7619        client
7620            .instruments
7621            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7622
7623        let spec = BarSpecification {
7624            step: std::num::NonZeroUsize::new(1).unwrap(),
7625            aggregation: BarAggregation::Minute,
7626            price_type: PriceType::Last,
7627        };
7628        let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7629
7630        let request = RequestBars::new(
7631            bar_type,
7632            None,
7633            None,
7634            Some(std::num::NonZeroUsize::new(1).unwrap()),
7635            Some(ClientId::from("dydx_test")),
7636            UUID4::new(),
7637            get_atomic_clock_realtime().get_time_ns(),
7638            None,
7639        );
7640
7641        let result = client.request_bars(&request);
7642        assert!(result.is_ok());
7643    }
7644
7645    #[tokio::test]
7646    async fn test_request_trades_with_excessive_limit() {
7647        let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7648        set_data_event_sender(sender);
7649
7650        let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7651        let config = DydxDataClientConfig {
7652            base_url_http: Some(base_url),
7653            is_testnet: true,
7654            ..Default::default()
7655        };
7656
7657        let http_client = DydxHttpClient::new(
7658            config.base_url_http.clone(),
7659            config.http_timeout_secs,
7660            config.http_proxy_url.clone(),
7661            config.is_testnet,
7662            None,
7663        )
7664        .unwrap();
7665
7666        let client =
7667            DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7668
7669        let instrument = create_test_instrument_any();
7670        let instrument_id = instrument.id();
7671        client
7672            .instruments
7673            .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7674
7675        let request = RequestTrades::new(
7676            instrument_id,
7677            None,
7678            None,
7679            Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7680            Some(ClientId::from("dydx_test")),
7681            UUID4::new(),
7682            get_atomic_clock_realtime().get_time_ns(),
7683            None,
7684        );
7685
7686        let result = client.request_trades(&request);
7687        assert!(result.is_ok());
7688    }
7689
7690    #[rstest]
7691    fn test_candle_topic_format() {
7692        let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7693        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7694        let resolution = "1MIN";
7695        let topic = format!("{ticker}/{resolution}");
7696
7697        assert_eq!(topic, "BTC-USD/1MIN");
7698        assert!(!topic.contains("-PERP"));
7699        assert!(!topic.contains(".DYDX"));
7700    }
7701}