Skip to main content

nautilus_dydx/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::{DashMap, DashSet};
25use futures_util::{Stream, StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent, DataResponse,
31        data::{
32            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
33            RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
34            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments,
35            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
36            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37            UnsubscribeInstrument, UnsubscribeInstruments, UnsubscribeMarkPrices,
38            UnsubscribeQuotes, UnsubscribeTrades,
39        },
40    },
41};
42use nautilus_core::{
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47    data::{
48        Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, OrderBookDelta,
49        OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
50    },
51    enums::{BookAction, BookType, OrderSide, RecordFlag},
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54    orderbook::OrderBook,
55    types::Quantity,
56};
57use tokio::{task::JoinHandle, time::Duration};
58use tokio_util::sync::CancellationToken;
59
60use crate::{
61    common::{
62        consts::DYDX_VENUE, enums::DydxCandleResolution, instrument_cache::InstrumentCache,
63        parse::extract_raw_symbol,
64    },
65    config::DydxDataClientConfig,
66    http::client::DydxHttpClient,
67    websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage, handler::HandlerCommand},
68};
69
70/// Groups WebSocket message handling dependencies.
71struct WsMessageContext {
72    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73    instrument_cache: Arc<InstrumentCache>,
74    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
75    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
76    ws_client: DydxWebSocketClient,
77    http_client: DydxHttpClient,
78    active_quote_subs: Arc<DashSet<InstrumentId>>,
79    active_delta_subs: Arc<DashSet<InstrumentId>>,
80    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
81    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
82    incomplete_bars: Arc<DashMap<BarType, Bar>>,
83    active_mark_price_subs: Arc<DashSet<InstrumentId>>,
84    active_index_price_subs: Arc<DashSet<InstrumentId>>,
85    active_funding_rate_subs: Arc<DashSet<InstrumentId>>,
86}
87
88/// dYdX data client for live market data streaming and historical data requests.
89///
90/// This client integrates with the Nautilus DataEngine to provide:
91/// - Real-time market data via WebSocket subscriptions
92/// - Historical data via REST API requests
93/// - Automatic instrument discovery and caching
94/// - Connection lifecycle management
95#[derive(Debug)]
96pub struct DydxDataClient {
97    /// High-resolution clock for timestamps.
98    clock: &'static AtomicTime,
99    /// The client ID for this data client.
100    client_id: ClientId,
101    /// Configuration for the data client.
102    config: DydxDataClientConfig,
103    /// HTTP client for REST API requests.
104    http_client: DydxHttpClient,
105    /// WebSocket client for real-time data streaming.
106    ws_client: DydxWebSocketClient,
107    /// Whether the client is currently connected.
108    is_connected: AtomicBool,
109    /// Cancellation token for async operations.
110    cancellation_token: CancellationToken,
111    /// Background task handles.
112    tasks: Vec<JoinHandle<()>>,
113    /// Channel sender for emitting data events to the DataEngine.
114    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
115    /// Shared instrument cache (with HTTP client and execution client).
116    instrument_cache: Arc<InstrumentCache>,
117    /// Local order books maintained for generating quotes and resolving crosses.
118    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
119    /// Last quote tick per instrument (used for quote generation from book deltas).
120    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
121    /// Incomplete bars cache for bar aggregation.
122    /// Tracks bars not yet closed (ts_event > current_time), keyed by BarType.
123    /// Bars are emitted only when they close (ts_event <= current_time).
124    incomplete_bars: Arc<DashMap<BarType, Bar>>,
125    /// WebSocket topic to BarType mappings.
126    /// Maps dYdX candle topics (e.g., "BTC-USD/1MIN") to Nautilus BarType.
127    /// Used for subscription validation and reconnection recovery.
128    bar_type_mappings: Arc<DashMap<String, BarType>>,
129    /// Active quote subscriptions (instruments expecting `QuoteTick` events).
130    active_quote_subs: Arc<DashSet<InstrumentId>>,
131    /// Active orderbook delta subscriptions (instruments expecting `OrderBookDeltas` events).
132    active_delta_subs: Arc<DashSet<InstrumentId>>,
133    /// Active trade subscriptions for reconnection recovery.
134    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
135    /// Active bar/candle subscriptions for reconnection recovery (maps instrument+resolution to BarType).
136    active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
137    /// Active mark price subscriptions (instruments expecting `MarkPriceUpdate` events).
138    active_mark_price_subs: Arc<DashSet<InstrumentId>>,
139    /// Active index price subscriptions (instruments expecting `IndexPriceUpdate` events).
140    active_index_price_subs: Arc<DashSet<InstrumentId>>,
141    /// Active funding rate subscriptions (instruments expecting `FundingRateUpdate` events).
142    active_funding_rate_subs: Arc<DashSet<InstrumentId>>,
143}
144
145impl DydxDataClient {
146    fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
147        let resolution: &'static str = DydxCandleResolution::from_bar_spec(spec)?.into();
148        Ok(resolution)
149    }
150
151    /// Creates a new [`DydxDataClient`] instance.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the client fails to initialize.
156    pub fn new(
157        client_id: ClientId,
158        config: DydxDataClientConfig,
159        http_client: DydxHttpClient,
160        ws_client: DydxWebSocketClient,
161    ) -> anyhow::Result<Self> {
162        let clock = get_atomic_clock_realtime();
163        let data_sender = get_data_event_sender();
164
165        // Share the instrument cache from HTTP client
166        let instrument_cache = Arc::clone(http_client.instrument_cache());
167
168        Ok(Self {
169            clock,
170            client_id,
171            config,
172            http_client,
173            ws_client,
174            is_connected: AtomicBool::new(false),
175            cancellation_token: CancellationToken::new(),
176            tasks: Vec::new(),
177            data_sender,
178            instrument_cache,
179            order_books: Arc::new(DashMap::new()),
180            last_quotes: Arc::new(DashMap::new()),
181            incomplete_bars: Arc::new(DashMap::new()),
182            bar_type_mappings: Arc::new(DashMap::new()),
183            active_quote_subs: Arc::new(DashSet::new()),
184            active_delta_subs: Arc::new(DashSet::new()),
185            active_trade_subs: Arc::new(DashMap::new()),
186            active_bar_subs: Arc::new(DashMap::new()),
187            active_mark_price_subs: Arc::new(DashSet::new()),
188            active_index_price_subs: Arc::new(DashSet::new()),
189            active_funding_rate_subs: Arc::new(DashSet::new()),
190        })
191    }
192
193    /// Returns the venue for this data client.
194    #[must_use]
195    pub fn venue(&self) -> Venue {
196        *DYDX_VENUE
197    }
198
199    /// Returns a reference to the client configuration.
200    #[must_use]
201    pub fn config(&self) -> &DydxDataClientConfig {
202        &self.config
203    }
204
205    /// Returns `true` when the client is connected.
206    #[must_use]
207    pub fn is_connected(&self) -> bool {
208        self.is_connected.load(Ordering::Relaxed)
209    }
210
211    /// Spawns an async WebSocket task with error handling.
212    ///
213    /// This helper ensures consistent error logging across all subscription methods.
214    fn spawn_ws<F>(&self, fut: F, context: &'static str)
215    where
216        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
217    {
218        get_runtime().spawn(async move {
219            if let Err(e) = fut.await {
220                log::error!("{context}: {e:?}");
221            }
222        });
223    }
224
225    /// Spawns a stream handler to dispatch WebSocket messages to the data engine.
226    fn spawn_ws_stream_handler(
227        &mut self,
228        stream: impl Stream<Item = NautilusWsMessage> + Send + 'static,
229        ctx: WsMessageContext,
230    ) {
231        let cancellation = self.cancellation_token.clone();
232
233        let handle = get_runtime().spawn(async move {
234            log::debug!("Message processing task started");
235            pin_mut!(stream);
236
237            loop {
238                tokio::select! {
239                    maybe_msg = stream.next() => {
240                        match maybe_msg {
241                            Some(msg) => Self::handle_ws_message(msg, &ctx),
242                            None => {
243                                log::debug!("WebSocket message channel closed");
244                                break;
245                            }
246                        }
247                    }
248                    () = cancellation.cancelled() => {
249                        log::debug!("WebSocket message task cancelled");
250                        break;
251                    }
252                }
253            }
254            log::debug!("WebSocket stream handler ended");
255        });
256
257        self.tasks.push(handle);
258    }
259
260    /// Awaits all background tasks with a timeout for graceful shutdown.
261    ///
262    /// This ensures tasks are given a chance to complete cleanly after cancellation
263    /// rather than being abruptly dropped. Tasks that don't complete within the
264    /// timeout are allowed to continue running (will be cleaned up by tokio).
265    async fn await_tasks_with_timeout(&mut self, timeout: Duration) {
266        for handle in self.tasks.drain(..) {
267            let _ = tokio::time::timeout(timeout, handle).await;
268        }
269    }
270
271    /// Bootstrap instruments from the dYdX Indexer API.
272    ///
273    /// This method:
274    /// 1. Fetches all available instruments from the REST API
275    /// 2. Caches them in the HTTP client
276    /// 3. Caches them in the WebSocket client (if present)
277    /// 4. Populates the local instruments cache
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if:
282    /// - The HTTP request fails.
283    /// - Instrument parsing fails.
284    ///
285    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
286        // Fetch instruments via HTTP - this populates the shared InstrumentCache
287        self.http_client
288            .fetch_and_cache_instruments()
289            .await
290            .context("failed to load instruments from dYdX")?;
291
292        let instruments: Vec<InstrumentAny> = self.http_client.all_instruments();
293
294        if instruments.is_empty() {
295            log::warn!("No instruments were loaded");
296            return Ok(instruments);
297        }
298
299        log::info!("Loaded {} instruments into shared cache", instruments.len());
300
301        // Cache in WebSocket client for handler lookups
302        self.ws_client.cache_instruments(instruments.clone());
303
304        // Publish all instruments to the data engine so they're available in the shared Cache
305        for instrument in &instruments {
306            if let Err(e) = self
307                .data_sender
308                .send(DataEvent::Instrument(instrument.clone()))
309            {
310                log::warn!("Failed to publish instrument {}: {e}", instrument.id());
311            }
312        }
313        log::debug!("Published {} instruments to data engine", instruments.len());
314
315        Ok(instruments)
316    }
317}
318
319#[async_trait::async_trait(?Send)]
320impl DataClient for DydxDataClient {
321    fn client_id(&self) -> ClientId {
322        self.client_id
323    }
324
325    fn venue(&self) -> Option<Venue> {
326        Some(*DYDX_VENUE)
327    }
328
329    fn start(&mut self) -> anyhow::Result<()> {
330        log::info!(
331            "Starting: client_id={}, is_testnet={}",
332            self.client_id,
333            self.http_client.is_testnet()
334        );
335        Ok(())
336    }
337
338    fn stop(&mut self) -> anyhow::Result<()> {
339        log::info!("Stopping {}", self.client_id);
340        self.cancellation_token.cancel();
341        self.is_connected.store(false, Ordering::Relaxed);
342        Ok(())
343    }
344
345    fn reset(&mut self) -> anyhow::Result<()> {
346        log::debug!("Resetting {}", self.client_id);
347        self.is_connected.store(false, Ordering::Relaxed);
348        self.cancellation_token = CancellationToken::new();
349        // Abort remaining tasks instead of just dropping handles to prevent resource leaks
350        for handle in self.tasks.drain(..) {
351            handle.abort();
352        }
353        Ok(())
354    }
355
356    fn dispose(&mut self) -> anyhow::Result<()> {
357        log::debug!("Disposing {}", self.client_id);
358        self.stop()
359    }
360
361    async fn connect(&mut self) -> anyhow::Result<()> {
362        if self.is_connected() {
363            return Ok(());
364        }
365
366        log::info!("Connecting");
367
368        // Bootstrap instruments first
369        self.bootstrap_instruments().await?;
370
371        // Connect WebSocket client and subscribe to market updates
372        self.ws_client
373            .connect()
374            .await
375            .context("failed to connect dYdX websocket")?;
376
377        self.ws_client
378            .subscribe_markets()
379            .await
380            .context("failed to subscribe to markets channel")?;
381
382        // Start message processing task (handler already converts to NautilusWsMessage)
383        let ctx = WsMessageContext {
384            data_sender: self.data_sender.clone(),
385            instrument_cache: self.instrument_cache.clone(),
386            order_books: self.order_books.clone(),
387            last_quotes: self.last_quotes.clone(),
388            ws_client: self.ws_client.clone(),
389            http_client: self.http_client.clone(),
390            active_quote_subs: self.active_quote_subs.clone(),
391            active_delta_subs: self.active_delta_subs.clone(),
392            active_trade_subs: self.active_trade_subs.clone(),
393            active_bar_subs: self.active_bar_subs.clone(),
394            incomplete_bars: self.incomplete_bars.clone(),
395            active_mark_price_subs: self.active_mark_price_subs.clone(),
396            active_index_price_subs: self.active_index_price_subs.clone(),
397            active_funding_rate_subs: self.active_funding_rate_subs.clone(),
398        };
399
400        let stream = self.ws_client.stream();
401        self.spawn_ws_stream_handler(stream, ctx);
402
403        self.is_connected.store(true, Ordering::Relaxed);
404        log::info!("Connected");
405
406        Ok(())
407    }
408
409    async fn disconnect(&mut self) -> anyhow::Result<()> {
410        if !self.is_connected() {
411            return Ok(());
412        }
413
414        log::info!("Disconnecting");
415
416        // Cancel all tasks
417        self.cancellation_token.cancel();
418
419        // Await tasks with timeout for graceful shutdown
420        self.await_tasks_with_timeout(Duration::from_secs(5)).await;
421
422        self.ws_client
423            .disconnect()
424            .await
425            .context("failed to disconnect dYdX websocket")?;
426
427        self.is_connected.store(false, Ordering::Relaxed);
428        log::info!("Disconnected dYdX data client");
429
430        Ok(())
431    }
432
433    fn is_connected(&self) -> bool {
434        self.is_connected.load(Ordering::Relaxed)
435    }
436
437    fn is_disconnected(&self) -> bool {
438        !self.is_connected()
439    }
440
441    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
442        // dYdX uses a global markets channel which streams instruments implicitly.
443        // There is no dedicated instruments subscription, so this is a no-op to
444        // mirror the behaviour of `subscribe_instruments`.
445        log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
446        Ok(())
447    }
448
449    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
450        // dYdX does not support per-instrument instrument feed subscriptions.
451        // The markets channel always streams all instruments, so this is a no-op.
452        log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
453        Ok(())
454    }
455
456    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
457        // dYdX markets channel auto-subscribes to all instruments
458        // No explicit subscription needed - already handled in connect()
459        log::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
460        Ok(())
461    }
462
463    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
464        // dYdX instruments are already cached from HTTP during connect()
465        // Look up and send the requested instrument to the data engine
466        if let Some(instrument) = self.instrument_cache.get(&cmd.instrument_id) {
467            log::debug!("Sending cached instrument for {}", cmd.instrument_id);
468            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
469                log::warn!("Failed to send instrument {}: {e}", cmd.instrument_id);
470            }
471        } else {
472            log::warn!(
473                "Instrument {} not found in cache (available: {})",
474                cmd.instrument_id,
475                self.instrument_cache.len()
476            );
477        }
478        Ok(())
479    }
480
481    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
482        let instrument_id = cmd.instrument_id;
483        self.active_mark_price_subs.insert(instrument_id);
484        log::info!("Subscribed to mark prices for {instrument_id} (via v4_markets channel)");
485        Ok(())
486    }
487
488    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
489        let instrument_id = cmd.instrument_id;
490        self.active_index_price_subs.insert(instrument_id);
491        log::info!("Subscribed to index prices for {instrument_id} (via v4_markets channel)");
492        Ok(())
493    }
494
495    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
496        let instrument_id = cmd.instrument_id;
497        self.active_funding_rate_subs.insert(instrument_id);
498        log::info!("Subscribed to funding rates for {instrument_id} (via v4_markets channel)");
499        Ok(())
500    }
501
502    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
503        self.active_mark_price_subs.remove(&cmd.instrument_id);
504        log::info!("Unsubscribed from mark prices for {}", cmd.instrument_id);
505        Ok(())
506    }
507
508    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
509        self.active_index_price_subs.remove(&cmd.instrument_id);
510        log::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
511        Ok(())
512    }
513
514    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
515        self.active_funding_rate_subs.remove(&cmd.instrument_id);
516        log::info!("Unsubscribed from funding rates for {}", cmd.instrument_id);
517        Ok(())
518    }
519
520    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
521        let ws = self.ws_client.clone();
522        let instrument_id = cmd.instrument_id;
523
524        // Track active subscription for reconnection recovery
525        self.active_trade_subs.insert(instrument_id, ());
526
527        self.spawn_ws(
528            async move {
529                ws.subscribe_trades(instrument_id)
530                    .await
531                    .context("trade subscription")
532            },
533            "dYdX trade subscription",
534        );
535
536        Ok(())
537    }
538
539    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
540        if cmd.book_type != BookType::L2_MBP {
541            anyhow::bail!(
542                "dYdX only supports L2_MBP order book deltas, received {:?}",
543                cmd.book_type
544            );
545        }
546
547        // Ensure local order book exists for this instrument.
548        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
549
550        // Track active delta subscription
551        self.active_delta_subs.insert(cmd.instrument_id);
552
553        let ws = self.ws_client.clone();
554        let instrument_id = cmd.instrument_id;
555
556        self.spawn_ws(
557            async move {
558                ws.subscribe_orderbook(instrument_id)
559                    .await
560                    .context("orderbook subscription")
561            },
562            "dYdX orderbook subscription",
563        );
564
565        Ok(())
566    }
567
568    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
569        // dYdX doesn't have a dedicated quotes channel —
570        // quotes are synthesized from order book deltas (top-of-book).
571        log::debug!(
572            "Subscribe_quotes for {}: subscribing to orderbook WS channel for quote synthesis",
573            cmd.instrument_id
574        );
575
576        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
577        self.active_quote_subs.insert(cmd.instrument_id);
578        let ws = self.ws_client.clone();
579        let instrument_id = cmd.instrument_id;
580
581        self.spawn_ws(
582            async move {
583                ws.subscribe_orderbook(instrument_id)
584                    .await
585                    .context("orderbook subscription (for quotes)")
586            },
587            "dYdX orderbook subscription (quotes)",
588        );
589
590        Ok(())
591    }
592
593    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
594        let ws = self.ws_client.clone();
595        let instrument_id = cmd.bar_type.instrument_id();
596        let spec = cmd.bar_type.spec();
597
598        // Use centralized bar spec mapping
599        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
600
601        // Track active subscription for reconnection recovery
602        let bar_type = cmd.bar_type;
603        self.active_bar_subs
604            .insert((instrument_id, resolution.to_string()), bar_type);
605
606        // Register topic → BarType mapping for validation and lookup
607        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
608        let topic = format!("{ticker}/{resolution}");
609        self.bar_type_mappings.insert(topic.clone(), bar_type);
610
611        self.spawn_ws(
612            async move {
613                // Register bar type in handler BEFORE subscribing to avoid race condition
614                if let Err(e) = ws.send_command(HandlerCommand::RegisterBarType { topic, bar_type })
615                {
616                    anyhow::bail!("Failed to register bar type: {e}");
617                }
618
619                // Delay to ensure handler processes registration before candle messages arrive
620                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
621
622                ws.subscribe_candles(instrument_id, resolution)
623                    .await
624                    .context("candles subscription")
625            },
626            "dYdX candles subscription",
627        );
628
629        Ok(())
630    }
631
632    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
633        // Remove from active subscription tracking
634        self.active_trade_subs.remove(&cmd.instrument_id);
635
636        let ws = self.ws_client.clone();
637        let instrument_id = cmd.instrument_id;
638
639        self.spawn_ws(
640            async move {
641                ws.unsubscribe_trades(instrument_id)
642                    .await
643                    .context("trade unsubscription")
644            },
645            "dYdX trade unsubscription",
646        );
647
648        Ok(())
649    }
650
651    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
652        // Remove from active delta subscription tracking
653        self.active_delta_subs.remove(&cmd.instrument_id);
654
655        let ws = self.ws_client.clone();
656        let instrument_id = cmd.instrument_id;
657
658        self.spawn_ws(
659            async move {
660                ws.unsubscribe_orderbook(instrument_id)
661                    .await
662                    .context("orderbook unsubscription")
663            },
664            "dYdX orderbook unsubscription",
665        );
666
667        Ok(())
668    }
669
670    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
671        log::debug!(
672            "unsubscribe_quotes for {}: removing quote subscription",
673            cmd.instrument_id
674        );
675
676        // Remove from active quote subscription tracking
677        self.active_quote_subs.remove(&cmd.instrument_id);
678
679        // Unsubscribe from WS orderbook channel (refcount handles dedup —
680        // only sends WS unsubscribe when no delta sub remains either)
681        let ws = self.ws_client.clone();
682        let instrument_id = cmd.instrument_id;
683
684        self.spawn_ws(
685            async move {
686                ws.unsubscribe_orderbook(instrument_id)
687                    .await
688                    .context("orderbook unsubscription (for quotes)")
689            },
690            "dYdX orderbook unsubscription (quotes)",
691        );
692
693        Ok(())
694    }
695
696    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
697        let ws = self.ws_client.clone();
698        let instrument_id = cmd.bar_type.instrument_id();
699        let spec = cmd.bar_type.spec();
700
701        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
702
703        // Remove from active subscription tracking
704        self.active_bar_subs
705            .remove(&(instrument_id, resolution.to_string()));
706
707        // Unregister bar type from handler and local mappings
708        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
709        let topic = format!("{ticker}/{resolution}");
710        self.bar_type_mappings.remove(&topic);
711
712        if let Err(e) = ws.send_command(HandlerCommand::UnregisterBarType { topic }) {
713            log::warn!("Failed to unregister bar type: {e}");
714        }
715
716        self.spawn_ws(
717            async move {
718                ws.unsubscribe_candles(instrument_id, resolution)
719                    .await
720                    .context("candles unsubscription")
721            },
722            "dYdX candles unsubscription",
723        );
724
725        Ok(())
726    }
727
728    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
729        if request.start.is_some() {
730            log::warn!(
731                "Requesting instrument {} with specified `start` which has no effect",
732                request.instrument_id
733            );
734        }
735        if request.end.is_some() {
736            log::warn!(
737                "Requesting instrument {} with specified `end` which has no effect",
738                request.instrument_id
739            );
740        }
741
742        let instrument_cache = self.instrument_cache.clone();
743        let sender = self.data_sender.clone();
744        let http = self.http_client.clone();
745        let instrument_id = request.instrument_id;
746        let request_id = request.request_id;
747        let client_id = request.client_id.unwrap_or(self.client_id);
748        let start = request.start;
749        let end = request.end;
750        let params = request.params;
751        let clock = self.clock;
752        let start_nanos = datetime_to_unix_nanos(start);
753        let end_nanos = datetime_to_unix_nanos(end);
754
755        get_runtime().spawn(async move {
756            // First try to get from cache
757            let instrument = if let Some(cached) = instrument_cache.get(&instrument_id) {
758                log::debug!("Found instrument {instrument_id} in cache");
759                Some(cached)
760            } else {
761                // Not in cache, fetch from API
762                log::debug!("Instrument {instrument_id} not in cache, fetching from API");
763                match http.request_instruments(None, None, None).await {
764                    Ok(instruments) => {
765                        // Cache all fetched instruments
766                        for inst in &instruments {
767                            instrument_cache.insert_instrument_only(inst.clone());
768                        }
769                        // Find the requested instrument
770                        instruments.into_iter().find(|i| i.id() == instrument_id)
771                    }
772                    Err(e) => {
773                        log::error!("Failed to fetch instruments from dYdX: {e:?}");
774                        None
775                    }
776                }
777            };
778
779            if let Some(inst) = instrument {
780                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
781                    request_id,
782                    client_id,
783                    instrument_id,
784                    inst,
785                    start_nanos,
786                    end_nanos,
787                    clock.get_time_ns(),
788                    params,
789                )));
790
791                if let Err(e) = sender.send(DataEvent::Response(response)) {
792                    log::error!("Failed to send instrument response: {e}");
793                }
794            } else {
795                log::error!("Instrument {instrument_id} not found");
796            }
797        });
798
799        Ok(())
800    }
801
802    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
803        let http = self.http_client.clone();
804        let sender = self.data_sender.clone();
805        let instrument_cache = self.instrument_cache.clone();
806        let request_id = request.request_id;
807        let client_id = request.client_id.unwrap_or(self.client_id);
808        let venue = self.venue();
809        let start = request.start;
810        let end = request.end;
811        let params = request.params;
812        let clock = self.clock;
813        let start_nanos = datetime_to_unix_nanos(start);
814        let end_nanos = datetime_to_unix_nanos(end);
815
816        get_runtime().spawn(async move {
817            match http.request_instruments(None, None, None).await {
818                Ok(instruments) => {
819                    log::info!("Fetched {} instruments from dYdX", instruments.len());
820
821                    // Cache all instruments
822                    for instrument in &instruments {
823                        instrument_cache.insert_instrument_only(instrument.clone());
824                    }
825
826                    let response = DataResponse::Instruments(InstrumentsResponse::new(
827                        request_id,
828                        client_id,
829                        venue,
830                        instruments,
831                        start_nanos,
832                        end_nanos,
833                        clock.get_time_ns(),
834                        params,
835                    ));
836
837                    if let Err(e) = sender.send(DataEvent::Response(response)) {
838                        log::error!("Failed to send instruments response: {e}");
839                    }
840                }
841                Err(e) => {
842                    log::error!("Failed to fetch instruments from dYdX: {e:?}");
843
844                    // Send empty response on error
845                    let response = DataResponse::Instruments(InstrumentsResponse::new(
846                        request_id,
847                        client_id,
848                        venue,
849                        Vec::new(),
850                        start_nanos,
851                        end_nanos,
852                        clock.get_time_ns(),
853                        params,
854                    ));
855
856                    if let Err(e) = sender.send(DataEvent::Response(response)) {
857                        log::error!("Failed to send empty instruments response: {e}");
858                    }
859                }
860            }
861        });
862
863        Ok(())
864    }
865
866    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
867        let http_client = self.http_client.clone();
868        let sender = self.data_sender.clone();
869        let instrument_id = request.instrument_id;
870        let start = request.start;
871        let end = request.end;
872        let limit = request.limit.map(|n| n.get() as u32);
873        let request_id = request.request_id;
874        let client_id = request.client_id.unwrap_or(self.client_id);
875        let params = request.params;
876        let clock = self.clock;
877        let start_nanos = datetime_to_unix_nanos(start);
878        let end_nanos = datetime_to_unix_nanos(end);
879
880        get_runtime().spawn(async move {
881            match http_client
882                .request_trade_ticks(instrument_id, start, end, limit)
883                .await
884                .context("failed to request trades from dYdX")
885            {
886                Ok(trades) => {
887                    let response = DataResponse::Trades(TradesResponse::new(
888                        request_id,
889                        client_id,
890                        instrument_id,
891                        trades,
892                        start_nanos,
893                        end_nanos,
894                        clock.get_time_ns(),
895                        params,
896                    ));
897                    if let Err(e) = sender.send(DataEvent::Response(response)) {
898                        log::error!("Failed to send trades response: {e}");
899                    }
900                }
901                Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
902            }
903        });
904
905        Ok(())
906    }
907
908    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
909        let http_client = self.http_client.clone();
910        let sender = self.data_sender.clone();
911        let bar_type = request.bar_type;
912        let start = request.start;
913        let end = request.end;
914        let limit = request.limit.map(|n| n.get() as u32);
915        let request_id = request.request_id;
916        let client_id = request.client_id.unwrap_or(self.client_id);
917        let params = request.params;
918        let clock = self.clock;
919        let start_nanos = datetime_to_unix_nanos(start);
920        let end_nanos = datetime_to_unix_nanos(end);
921
922        get_runtime().spawn(async move {
923            match http_client
924                .request_bars(bar_type, start, end, limit, true)
925                .await
926                .context("failed to request bars from dYdX")
927            {
928                Ok(bars) => {
929                    let response = DataResponse::Bars(BarsResponse::new(
930                        request_id,
931                        client_id,
932                        bar_type,
933                        bars,
934                        start_nanos,
935                        end_nanos,
936                        clock.get_time_ns(),
937                        params,
938                    ));
939                    if let Err(e) = sender.send(DataEvent::Response(response)) {
940                        log::error!("Failed to send bars response: {e}");
941                    }
942                }
943                Err(e) => log::error!("Bar request failed for {bar_type}: {e:?}"),
944            }
945        });
946
947        Ok(())
948    }
949}
950
951impl DydxDataClient {
952    /// Get a cached instrument by InstrumentId.
953    #[must_use]
954    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
955        self.instrument_cache.get(instrument_id)
956    }
957
958    /// Get all cached instruments.
959    #[must_use]
960    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
961        self.instrument_cache.all_instruments()
962    }
963
964    /// Cache a single instrument.
965    pub fn cache_instrument(&self, instrument: InstrumentAny) {
966        self.instrument_cache.insert_instrument_only(instrument);
967    }
968
969    /// Cache multiple instruments.
970    ///
971    /// Clears the existing cache first, then adds all provided instruments.
972    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
973        self.instrument_cache.clear();
974        self.instrument_cache.insert_instruments_only(instruments);
975    }
976
977    fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
978        self.order_books
979            .entry(instrument_id)
980            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
981    }
982
983    /// Get BarType for a given WebSocket candle topic.
984    #[must_use]
985    pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
986        self.bar_type_mappings
987            .get(topic)
988            .map(|entry| *entry.value())
989    }
990
991    /// Get all registered bar topics.
992    #[must_use]
993    pub fn get_bar_topics(&self) -> Vec<String> {
994        self.bar_type_mappings
995            .iter()
996            .map(|entry| entry.key().clone())
997            .collect()
998    }
999
1000    fn handle_ws_message(message: NautilusWsMessage, ctx: &WsMessageContext) {
1001        match message {
1002            NautilusWsMessage::Data(payloads) => {
1003                Self::handle_data_message(payloads, &ctx.data_sender, &ctx.incomplete_bars);
1004            }
1005            NautilusWsMessage::Deltas(deltas) => {
1006                Self::handle_deltas_message(
1007                    *deltas,
1008                    &ctx.data_sender,
1009                    &ctx.order_books,
1010                    &ctx.last_quotes,
1011                    &ctx.instrument_cache,
1012                    &ctx.active_quote_subs,
1013                    &ctx.active_delta_subs,
1014                );
1015            }
1016            NautilusWsMessage::MarkPrice(mark_price) => {
1017                if ctx
1018                    .active_mark_price_subs
1019                    .contains(&mark_price.instrument_id)
1020                {
1021                    let data = NautilusData::MarkPriceUpdate(mark_price);
1022                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1023                        log::error!("Failed to emit mark price: {e}");
1024                    }
1025                }
1026            }
1027            NautilusWsMessage::IndexPrice(index_price) => {
1028                if ctx
1029                    .active_index_price_subs
1030                    .contains(&index_price.instrument_id)
1031                {
1032                    let data = NautilusData::IndexPriceUpdate(index_price);
1033                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1034                        log::error!("Failed to emit index price: {e}");
1035                    }
1036                }
1037            }
1038            NautilusWsMessage::FundingRate(funding_rate) => {
1039                if ctx
1040                    .active_funding_rate_subs
1041                    .contains(&funding_rate.instrument_id)
1042                    && let Err(e) = ctx.data_sender.send(DataEvent::FundingRate(funding_rate))
1043                {
1044                    log::error!("Failed to emit funding rate: {e}");
1045                }
1046            }
1047            NautilusWsMessage::Error(err) => {
1048                log::error!("dYdX WS error: {err}");
1049            }
1050            NautilusWsMessage::Reconnected => {
1051                log::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1052
1053                let total_subs = ctx.active_quote_subs.len()
1054                    + ctx.active_delta_subs.len()
1055                    + ctx.active_trade_subs.len()
1056                    + ctx.active_bar_subs.len();
1057
1058                if total_subs == 0 {
1059                    log::debug!("No active subscriptions to restore");
1060                    return;
1061                }
1062
1063                log::info!(
1064                    "Restoring {} subscriptions (quotes={}, deltas={}, trades={}, bars={})",
1065                    total_subs,
1066                    ctx.active_quote_subs.len(),
1067                    ctx.active_delta_subs.len(),
1068                    ctx.active_trade_subs.len(),
1069                    ctx.active_bar_subs.len()
1070                );
1071
1072                // Re-subscribe for quote subscriptions (bumps WS refcount)
1073                for instrument_id in ctx.active_quote_subs.iter() {
1074                    let instrument_id = *instrument_id;
1075                    let ws_clone = ctx.ws_client.clone();
1076                    get_runtime().spawn(async move {
1077                        if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1078                            log::error!(
1079                                "Failed to re-subscribe to orderbook (quotes) for {instrument_id}: {e:?}"
1080                            );
1081                        } else {
1082                            log::debug!("Re-subscribed to orderbook (quotes) for {instrument_id}");
1083                        }
1084                    });
1085                }
1086
1087                // Re-subscribe for delta subscriptions (bumps WS refcount)
1088                for instrument_id in ctx.active_delta_subs.iter() {
1089                    let instrument_id = *instrument_id;
1090                    let ws_clone = ctx.ws_client.clone();
1091                    get_runtime().spawn(async move {
1092                        if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1093                            log::error!(
1094                                "Failed to re-subscribe to orderbook (deltas) for {instrument_id}: {e:?}"
1095                            );
1096                        } else {
1097                            log::debug!("Re-subscribed to orderbook (deltas) for {instrument_id}");
1098                        }
1099                    });
1100                }
1101
1102                // Re-subscribe to trade channels
1103                for entry in ctx.active_trade_subs.iter() {
1104                    let instrument_id = *entry.key();
1105                    let ws_clone = ctx.ws_client.clone();
1106                    get_runtime().spawn(async move {
1107                        if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1108                            log::error!(
1109                                "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1110                            );
1111                        } else {
1112                            log::debug!("Re-subscribed to trades for {instrument_id}");
1113                        }
1114                    });
1115                }
1116
1117                // Re-subscribe to candle/bar channels
1118                for entry in ctx.active_bar_subs.iter() {
1119                    let (instrument_id, resolution) = entry.key();
1120                    let instrument_id = *instrument_id;
1121                    let resolution = resolution.clone();
1122                    let bar_type = *entry.value();
1123                    let ws_clone = ctx.ws_client.clone();
1124
1125                    // Re-register bar type with handler
1126                    let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1127                    let topic = format!("{ticker}/{resolution}");
1128                    if let Err(e) = ctx
1129                        .ws_client
1130                        .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
1131                    {
1132                        log::warn!(
1133                            "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1134                        );
1135                    }
1136
1137                    get_runtime().spawn(async move {
1138                        if let Err(e) =
1139                            ws_clone.subscribe_candles(instrument_id, &resolution).await
1140                        {
1141                            log::error!(
1142                                "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1143                            );
1144                        } else {
1145                            log::debug!(
1146                                "Re-subscribed to candles for {instrument_id} ({resolution})"
1147                            );
1148                        }
1149                    });
1150                }
1151
1152                log::info!("Completed re-subscription requests after reconnection");
1153            }
1154            NautilusWsMessage::BlockHeight { .. } => {
1155                log::debug!(
1156                    "Ignoring block height message on dYdX data client (handled by execution adapter)"
1157                );
1158            }
1159            NautilusWsMessage::Order(_)
1160            | NautilusWsMessage::Fill(_)
1161            | NautilusWsMessage::Position(_)
1162            | NautilusWsMessage::AccountState(_)
1163            | NautilusWsMessage::SubaccountSubscribed(_)
1164            | NautilusWsMessage::SubaccountsChannelData(_) => {
1165                log::debug!(
1166                    "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1167                );
1168            }
1169            NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
1170                // New instrument discovered via WebSocket - fetch via HTTP and cache
1171                log::info!("New instrument discovered via WebSocket: {ticker}");
1172
1173                let http_client = ctx.http_client.clone();
1174                let ws_client = ctx.ws_client.clone();
1175                let data_sender = ctx.data_sender.clone();
1176
1177                get_runtime().spawn(async move {
1178                    match http_client.fetch_and_cache_single_instrument(&ticker).await {
1179                        Ok(Some(instrument)) => {
1180                            // Cache in WebSocket client for future data parsing
1181                            ws_client.cache_instrument(instrument.clone());
1182                            // The InstrumentCache is already updated by fetch_and_cache_single_instrument
1183
1184                            // Send to data engine
1185                            if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
1186                                log::error!("Failed to emit new instrument: {e}");
1187                            }
1188                            log::info!("Fetched and cached new instrument: {ticker}");
1189                        }
1190                        Ok(None) => {
1191                            log::warn!("New instrument {ticker} not found or inactive");
1192                        }
1193                        Err(e) => {
1194                            log::error!("Failed to fetch new instrument {ticker}: {e}");
1195                        }
1196                    }
1197                });
1198            }
1199        }
1200    }
1201
1202    fn handle_data_message(
1203        payloads: Vec<NautilusData>,
1204        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1205        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1206    ) {
1207        for data in payloads {
1208            // Filter bars through incomplete bars cache
1209            if let NautilusData::Bar(bar) = data {
1210                Self::handle_bar_message(bar, data_sender, incomplete_bars);
1211            } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1212                log::error!("Failed to emit data event: {e}");
1213            }
1214        }
1215    }
1216
1217    /// Handles bar messages by tracking incomplete bars and only emitting completed ones.
1218    ///
1219    /// WebSocket candle updates arrive continuously. This method:
1220    /// - Caches bars where ts_event > current_time (incomplete)
1221    /// - Emits bars where ts_event <= current_time (complete)
1222    /// - Updates cached incomplete bars with latest data
1223    fn handle_bar_message(
1224        bar: Bar,
1225        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1226        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1227    ) {
1228        let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1229        let bar_type = bar.bar_type;
1230
1231        if bar.ts_event <= current_time_ns {
1232            // Bar is complete - emit it and remove from incomplete cache
1233            incomplete_bars.remove(&bar_type);
1234            if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1235                log::error!("Failed to emit completed bar: {e}");
1236            }
1237        } else {
1238            // Bar is incomplete - cache it (updates existing entry)
1239            log::trace!(
1240                "Caching incomplete bar for {} (ts_event={}, current={})",
1241                bar_type,
1242                bar.ts_event,
1243                current_time_ns
1244            );
1245            incomplete_bars.insert(bar_type, bar);
1246        }
1247    }
1248
1249    /// Resolves a crossed order book by generating synthetic deltas to uncross it.
1250    ///
1251    /// dYdX order books can become crossed due to:
1252    /// - Validator delays in order acknowledgment across the network
1253    /// - Missed or delayed WebSocket messages from the venue
1254    ///
1255    /// This function detects when bid_price >= ask_price and iteratively removes
1256    /// the smaller side while adjusting the larger side until the book is uncrossed.
1257    ///
1258    /// # Algorithm
1259    ///
1260    /// For each crossed level:
1261    /// - If bid_size > ask_size: DELETE ask, UPDATE bid (reduce by ask_size)
1262    /// - If bid_size < ask_size: DELETE bid, UPDATE ask (reduce by bid_size)
1263    /// - If bid_size == ask_size: DELETE both bid and ask
1264    ///
1265    /// The algorithm continues until no more crosses exist or the book is empty.
1266    fn resolve_crossed_order_book(
1267        book: &mut OrderBook,
1268        venue_deltas: OrderBookDeltas,
1269        instrument: &InstrumentAny,
1270    ) -> anyhow::Result<OrderBookDeltas> {
1271        let instrument_id = venue_deltas.instrument_id;
1272        let ts_init = venue_deltas.ts_init;
1273        let mut all_deltas = venue_deltas.deltas.clone();
1274
1275        // Apply the original venue deltas first
1276        book.apply_deltas(&venue_deltas)?;
1277
1278        // Check if orderbook is crossed
1279        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1280            (book.best_bid_price(), book.best_ask_price())
1281        {
1282            bid_price >= ask_price
1283        } else {
1284            false
1285        };
1286
1287        // Iteratively uncross the orderbook
1288        while is_crossed {
1289            log::debug!(
1290                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1291                instrument_id,
1292                book.best_bid_price(),
1293                book.best_ask_price()
1294            );
1295
1296            let bid_price = match book.best_bid_price() {
1297                Some(p) => p,
1298                None => break,
1299            };
1300            let ask_price = match book.best_ask_price() {
1301                Some(p) => p,
1302                None => break,
1303            };
1304            let bid_size = match book.best_bid_size() {
1305                Some(s) => s,
1306                None => break,
1307            };
1308            let ask_size = match book.best_ask_size() {
1309                Some(s) => s,
1310                None => break,
1311            };
1312
1313            let mut temp_deltas = Vec::new();
1314
1315            if bid_size > ask_size {
1316                // Remove ask level, reduce bid level
1317                let new_bid_size = Quantity::new(
1318                    bid_size.as_f64() - ask_size.as_f64(),
1319                    instrument.size_precision(),
1320                );
1321                temp_deltas.push(OrderBookDelta::new(
1322                    instrument_id,
1323                    BookAction::Update,
1324                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1325                    0,
1326                    0,
1327                    ts_init,
1328                    ts_init,
1329                ));
1330                temp_deltas.push(OrderBookDelta::new(
1331                    instrument_id,
1332                    BookAction::Delete,
1333                    BookOrder::new(
1334                        OrderSide::Sell,
1335                        ask_price,
1336                        Quantity::new(0.0, instrument.size_precision()),
1337                        0,
1338                    ),
1339                    0,
1340                    0,
1341                    ts_init,
1342                    ts_init,
1343                ));
1344            } else if bid_size < ask_size {
1345                // Remove bid level, reduce ask level
1346                let new_ask_size = Quantity::new(
1347                    ask_size.as_f64() - bid_size.as_f64(),
1348                    instrument.size_precision(),
1349                );
1350                temp_deltas.push(OrderBookDelta::new(
1351                    instrument_id,
1352                    BookAction::Update,
1353                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1354                    0,
1355                    0,
1356                    ts_init,
1357                    ts_init,
1358                ));
1359                temp_deltas.push(OrderBookDelta::new(
1360                    instrument_id,
1361                    BookAction::Delete,
1362                    BookOrder::new(
1363                        OrderSide::Buy,
1364                        bid_price,
1365                        Quantity::new(0.0, instrument.size_precision()),
1366                        0,
1367                    ),
1368                    0,
1369                    0,
1370                    ts_init,
1371                    ts_init,
1372                ));
1373            } else {
1374                // Equal sizes: remove both levels
1375                temp_deltas.push(OrderBookDelta::new(
1376                    instrument_id,
1377                    BookAction::Delete,
1378                    BookOrder::new(
1379                        OrderSide::Buy,
1380                        bid_price,
1381                        Quantity::new(0.0, instrument.size_precision()),
1382                        0,
1383                    ),
1384                    0,
1385                    0,
1386                    ts_init,
1387                    ts_init,
1388                ));
1389                temp_deltas.push(OrderBookDelta::new(
1390                    instrument_id,
1391                    BookAction::Delete,
1392                    BookOrder::new(
1393                        OrderSide::Sell,
1394                        ask_price,
1395                        Quantity::new(0.0, instrument.size_precision()),
1396                        0,
1397                    ),
1398                    0,
1399                    0,
1400                    ts_init,
1401                    ts_init,
1402                ));
1403            }
1404
1405            // Apply temporary deltas to the book
1406            let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1407            book.apply_deltas(&temp_deltas_obj)?;
1408            all_deltas.extend(temp_deltas);
1409
1410            // Check if still crossed
1411            is_crossed = if let (Some(bid_price), Some(ask_price)) =
1412                (book.best_bid_price(), book.best_ask_price())
1413            {
1414                bid_price >= ask_price
1415            } else {
1416                false
1417            };
1418        }
1419
1420        // Set F_LAST flag on the final delta
1421        if let Some(last_delta) = all_deltas.last_mut() {
1422            last_delta.flags = RecordFlag::F_LAST as u8;
1423        }
1424
1425        Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1426    }
1427
1428    fn handle_deltas_message(
1429        deltas: OrderBookDeltas,
1430        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1431        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1432        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1433        instrument_cache: &Arc<InstrumentCache>,
1434        active_quote_subs: &Arc<DashSet<InstrumentId>>,
1435        active_delta_subs: &Arc<DashSet<InstrumentId>>,
1436    ) {
1437        let instrument_id = deltas.instrument_id;
1438
1439        // Get instrument for crossed orderbook resolution
1440        let instrument = match instrument_cache.get(&instrument_id) {
1441            Some(inst) => inst,
1442            None => {
1443                log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1444                // Still emit the raw deltas if delta subscription is active
1445                if active_delta_subs.contains(&instrument_id)
1446                    && let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1447                        OrderBookDeltas_API::new(deltas),
1448                    )))
1449                {
1450                    log::error!("Failed to emit order book deltas: {e}");
1451                }
1452                return;
1453            }
1454        };
1455
1456        // Always maintain local orderbook — both subscription types need book state
1457        let mut book = order_books
1458            .entry(instrument_id)
1459            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1460
1461        // Resolve crossed orderbook (applies deltas internally)
1462        let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
1463        {
1464            Ok(d) => d,
1465            Err(e) => {
1466                log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
1467                return;
1468            }
1469        };
1470
1471        // Conditionally emit QuoteTick if instrument has quote subscription
1472        if active_quote_subs.contains(&instrument_id) {
1473            // Generate QuoteTick from updated top-of-book
1474            // Edge case: If orderbook is empty after deltas, fall back to last quote
1475            let quote_opt = if let (Some(bid_price), Some(ask_price)) =
1476                (book.best_bid_price(), book.best_ask_price())
1477                && let (Some(bid_size), Some(ask_size)) =
1478                    (book.best_bid_size(), book.best_ask_size())
1479            {
1480                Some(QuoteTick::new(
1481                    instrument_id,
1482                    bid_price,
1483                    ask_price,
1484                    bid_size,
1485                    ask_size,
1486                    resolved_deltas.ts_event,
1487                    resolved_deltas.ts_init,
1488                ))
1489            } else {
1490                // Edge case: Empty orderbook levels - use last quote as fallback
1491                if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
1492                    log::debug!(
1493                        "Empty orderbook for {instrument_id} after applying deltas, using last quote"
1494                    );
1495                    last_quotes.get(&instrument_id).map(|q| *q)
1496                } else {
1497                    None
1498                }
1499            };
1500
1501            if let Some(quote) = quote_opt {
1502                // Only emit when top-of-book changes
1503                let emit_quote = !matches!(
1504                    last_quotes.get(&instrument_id),
1505                    Some(existing) if *existing == quote
1506                );
1507
1508                if emit_quote {
1509                    last_quotes.insert(instrument_id, quote);
1510                    if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
1511                        log::error!("Failed to emit quote tick: {e}");
1512                    }
1513                }
1514            } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
1515                // Partial orderbook (only one side) - log but don't emit
1516                log::debug!(
1517                    "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
1518                    book.best_bid_price(),
1519                    book.best_ask_price()
1520                );
1521            }
1522        }
1523
1524        // Conditionally emit OrderBookDeltas if instrument has delta subscription
1525        if active_delta_subs.contains(&instrument_id) {
1526            let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
1527            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1528                log::error!("Failed to emit order book deltas event: {e}");
1529            }
1530        }
1531    }
1532}