nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    messages::{
26        DataEvent,
27        data::{
28            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32            UnsubscribeQuotes, UnsubscribeTrades,
33        },
34    },
35    runner::get_data_event_sender,
36};
37use nautilus_core::{
38    UnixNanos,
39    time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43    data::{Bar, BarType, Data},
44    enums::{AggregationSource, BarAggregation},
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::consts::{HYPERLIQUID_TESTNET_WS_URL, HYPERLIQUID_VENUE, HYPERLIQUID_WS_URL},
55    config::HyperliquidDataClientConfig,
56    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57    websocket::{
58        client::HyperliquidWebSocketClient, messages::HyperliquidWsMessage,
59        parse::parse_ws_quote_tick,
60    },
61};
62
63#[derive(Debug)]
64pub struct HyperliquidDataClient {
65    client_id: ClientId,
66    #[allow(dead_code)]
67    config: HyperliquidDataClientConfig,
68    http_client: HyperliquidHttpClient,
69    ws_client: HyperliquidWebSocketClient,
70    is_connected: AtomicBool,
71    cancellation_token: CancellationToken,
72    tasks: Vec<JoinHandle<()>>,
73    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
75    /// Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
76    /// for efficient O(1) lookup in WebSocket message handlers
77    coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
78    clock: &'static AtomicTime,
79    #[allow(dead_code)]
80    instrument_refresh_active: bool,
81}
82
83impl HyperliquidDataClient {
84    /// Creates a new [`HyperliquidDataClient`] instance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the HTTP client fails to initialize.
89    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
90        let clock = get_atomic_clock_realtime();
91        let data_sender = get_data_event_sender();
92
93        let http_client = if let Some(private_key_str) = &config.private_key {
94            let secrets = crate::common::credential::Secrets {
95                private_key: crate::common::credential::EvmPrivateKey::new(
96                    private_key_str.clone(),
97                )?,
98                is_testnet: config.is_testnet,
99                vault_address: None,
100            };
101            HyperliquidHttpClient::with_credentials(&secrets, config.http_timeout_secs)
102        } else {
103            HyperliquidHttpClient::new(config.is_testnet, config.http_timeout_secs)
104        };
105
106        let ws_url = if config.is_testnet {
107            HYPERLIQUID_TESTNET_WS_URL
108        } else {
109            HYPERLIQUID_WS_URL
110        };
111        let ws_client = HyperliquidWebSocketClient::new(ws_url.to_string());
112
113        Ok(Self {
114            client_id,
115            config,
116            http_client,
117            ws_client,
118            is_connected: AtomicBool::new(false),
119            cancellation_token: CancellationToken::new(),
120            tasks: Vec::new(),
121            data_sender,
122            instruments: Arc::new(RwLock::new(AHashMap::new())),
123            coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
124            clock,
125            instrument_refresh_active: false,
126        })
127    }
128
129    fn venue(&self) -> Venue {
130        *HYPERLIQUID_VENUE
131    }
132
133    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
134        let instruments = self
135            .http_client
136            .request_instruments()
137            .await
138            .context("Failed to fetch instruments during bootstrap")?;
139
140        let mut instruments_map = self.instruments.write().unwrap();
141        let mut coin_map = self.coin_to_instrument_id.write().unwrap();
142
143        for instrument in &instruments {
144            let instrument_id = instrument.id();
145            instruments_map.insert(instrument_id, instrument.clone());
146
147            // Build coin-to-instrument-id index for efficient WebSocket message lookup
148            // Extract coin symbol from instrument ID (e.g., "BTC" from "BTC-PERP")
149            let symbol = instrument_id.symbol.as_str();
150            if let Some(coin) = symbol.split('-').next() {
151                coin_map.insert(Ustr::from(coin), instrument_id);
152            }
153
154            // Also add instrument to the WebSocket client's cache for fast lookups
155            // used by the WebSocket client and execution path.
156            self.ws_client.add_instrument(instrument.clone());
157        }
158
159        tracing::info!(
160            "Bootstrapped {} instruments with {} coin mappings",
161            instruments_map.len(),
162            coin_map.len()
163        );
164        Ok(instruments)
165    }
166
167    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
168        self.ws_client
169            .ensure_connected()
170            .await
171            .context("Failed to connect to Hyperliquid WebSocket")?;
172
173        // No message processing loop needed - the WebSocket client handles it internally
174        Ok(())
175    }
176
177    #[allow(dead_code)]
178    fn handle_ws_message(
179        msg: HyperliquidWsMessage,
180        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
181        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
182        coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
183        _venue: Venue,
184        clock: &'static AtomicTime,
185    ) {
186        match msg {
187            HyperliquidWsMessage::Bbo { data } => {
188                tracing::debug!("Received BBO message for coin: {}", data.coin);
189
190                // Use efficient O(1) lookup instead of iterating through all instruments
191                // Hyperliquid WebSocket sends coin="BTC", lookup returns "BTC-PERP" instrument ID
192                let coin_map = coin_to_instrument_id.read().unwrap();
193                let instrument_id = coin_map.get(&data.coin);
194
195                if let Some(&instrument_id) = instrument_id {
196                    let instruments_map = instruments.read().unwrap();
197                    if let Some(instrument) = instruments_map.get(&instrument_id) {
198                        let ts_init = clock.get_time_ns();
199
200                        match parse_ws_quote_tick(&data, instrument, ts_init) {
201                            Ok(quote_tick) => {
202                                tracing::debug!(
203                                    "Parsed quote tick for {}: bid={}, ask={}",
204                                    data.coin,
205                                    quote_tick.bid_price,
206                                    quote_tick.ask_price
207                                );
208                                if let Err(e) =
209                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
210                                {
211                                    tracing::error!("Failed to send quote tick: {e}");
212                                }
213                            }
214                            Err(e) => {
215                                tracing::error!(
216                                    "Failed to parse quote tick for {}: {e}",
217                                    data.coin
218                                );
219                            }
220                        }
221                    }
222                } else {
223                    tracing::warn!(
224                        "Received BBO for unknown coin: {} (no matching instrument found)",
225                        data.coin
226                    );
227                }
228            }
229            _ => {
230                // Log other message types for debugging
231                tracing::trace!("Received WebSocket message: {:?}", msg);
232            }
233        }
234    }
235
236    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
237        let instruments = self.instruments.read().unwrap();
238        instruments
239            .get(instrument_id)
240            .cloned()
241            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
242    }
243}
244
245fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
246    value
247        .and_then(|dt| dt.timestamp_nanos_opt())
248        .and_then(|nanos| u64::try_from(nanos).ok())
249        .map(UnixNanos::from)
250}
251
252impl HyperliquidDataClient {
253    #[allow(dead_code)]
254    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
255        if let Err(e) = sender.send(DataEvent::Data(data)) {
256            tracing::error!("Failed to emit data event: {e}");
257        }
258    }
259}
260
261#[async_trait::async_trait]
262impl DataClient for HyperliquidDataClient {
263    fn client_id(&self) -> ClientId {
264        self.client_id
265    }
266
267    fn venue(&self) -> Option<Venue> {
268        Some(self.venue())
269    }
270
271    fn start(&mut self) -> anyhow::Result<()> {
272        tracing::info!("Starting Hyperliquid data client {}", self.client_id);
273        Ok(())
274    }
275
276    fn stop(&mut self) -> anyhow::Result<()> {
277        tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
278        self.cancellation_token.cancel();
279        self.is_connected.store(false, Ordering::Relaxed);
280        Ok(())
281    }
282
283    fn reset(&mut self) -> anyhow::Result<()> {
284        tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
285        self.is_connected.store(false, Ordering::Relaxed);
286        self.cancellation_token = CancellationToken::new();
287        self.tasks.clear();
288        Ok(())
289    }
290
291    fn dispose(&mut self) -> anyhow::Result<()> {
292        tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
293        self.stop()
294    }
295
296    fn is_connected(&self) -> bool {
297        self.is_connected.load(Ordering::Acquire)
298    }
299
300    fn is_disconnected(&self) -> bool {
301        !self.is_connected()
302    }
303
304    async fn connect(&mut self) -> anyhow::Result<()> {
305        if self.is_connected() {
306            return Ok(());
307        }
308
309        // Bootstrap instruments from HTTP API
310        let _instruments = self
311            .bootstrap_instruments()
312            .await
313            .context("Failed to bootstrap instruments")?;
314
315        // Connect WebSocket client
316        self.spawn_ws()
317            .await
318            .context("Failed to spawn WebSocket client")?;
319
320        self.is_connected.store(true, Ordering::Relaxed);
321        tracing::info!("Hyperliquid data client connected");
322
323        Ok(())
324    }
325
326    async fn disconnect(&mut self) -> anyhow::Result<()> {
327        if !self.is_connected() {
328            return Ok(());
329        }
330
331        // Cancel all tasks
332        self.cancellation_token.cancel();
333
334        // Wait for all tasks to complete
335        for task in self.tasks.drain(..) {
336            if let Err(e) = task.await {
337                tracing::error!("Error waiting for task to complete: {e}");
338            }
339        }
340
341        // Disconnect WebSocket client
342        if let Err(e) = self.ws_client.disconnect().await {
343            tracing::error!("Error disconnecting WebSocket client: {e}");
344        }
345
346        // Clear state
347        {
348            let mut instruments = self.instruments.write().unwrap();
349            instruments.clear();
350        }
351
352        self.is_connected.store(false, Ordering::Relaxed);
353        tracing::info!("Hyperliquid data client disconnected");
354
355        Ok(())
356    }
357
358    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
359        tracing::debug!("Requesting all instruments");
360
361        let instruments = {
362            let instruments_map = self.instruments.read().unwrap();
363            instruments_map.values().cloned().collect()
364        };
365
366        let response = DataResponse::Instruments(InstrumentsResponse::new(
367            request.request_id,
368            request.client_id.unwrap_or(self.client_id),
369            self.venue(),
370            instruments,
371            datetime_to_unix_nanos(request.start),
372            datetime_to_unix_nanos(request.end),
373            self.clock.get_time_ns(),
374            request.params.clone(),
375        ));
376
377        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
378            tracing::error!("Failed to send instruments response: {e}");
379        }
380
381        Ok(())
382    }
383
384    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
385        tracing::debug!("Requesting instrument: {}", request.instrument_id);
386
387        let instrument = self.get_instrument(&request.instrument_id)?;
388
389        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
390            request.request_id,
391            request.client_id.unwrap_or(self.client_id),
392            instrument.id(),
393            instrument,
394            datetime_to_unix_nanos(request.start),
395            datetime_to_unix_nanos(request.end),
396            self.clock.get_time_ns(),
397            request.params.clone(),
398        )));
399
400        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
401            tracing::error!("Failed to send instrument response: {e}");
402        }
403
404        Ok(())
405    }
406
407    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
408        tracing::debug!("Requesting bars for {}", request.bar_type);
409
410        let http = self.http_client.clone();
411        let sender = self.data_sender.clone();
412        let bar_type = request.bar_type;
413        let start = request.start;
414        let end = request.end;
415        let limit = request.limit.map(|n| n.get() as u32);
416        let request_id = request.request_id;
417        let client_id = request.client_id.unwrap_or(self.client_id);
418        let params = request.params.clone();
419        let clock = self.clock;
420        let start_nanos = datetime_to_unix_nanos(start);
421        let end_nanos = datetime_to_unix_nanos(end);
422        let instruments = Arc::clone(&self.instruments);
423
424        tokio::spawn(async move {
425            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
426                Ok(bars) => {
427                    let response = DataResponse::Bars(BarsResponse::new(
428                        request_id,
429                        client_id,
430                        bar_type,
431                        bars,
432                        start_nanos,
433                        end_nanos,
434                        clock.get_time_ns(),
435                        params,
436                    ));
437                    if let Err(e) = sender.send(DataEvent::Response(response)) {
438                        tracing::error!("Failed to send bars response: {e}");
439                    }
440                }
441                Err(e) => tracing::error!("Bar request failed: {e:?}"),
442            }
443        });
444
445        Ok(())
446    }
447
448    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
449        tracing::debug!("Requesting trades for {}", request.instrument_id);
450
451        let trades = Vec::new();
452
453        let response = DataResponse::Trades(TradesResponse::new(
454            request.request_id,
455            request.client_id.unwrap_or(self.client_id),
456            request.instrument_id,
457            trades,
458            datetime_to_unix_nanos(request.start),
459            datetime_to_unix_nanos(request.end),
460            self.clock.get_time_ns(),
461            request.params.clone(),
462        ));
463
464        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
465            tracing::error!("Failed to send trades response: {e}");
466        }
467
468        Ok(())
469    }
470
471    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
472        tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
473
474        // Validate instrument exists
475        let instruments = self.instruments.read().unwrap();
476        if !instruments.contains_key(&subscription.instrument_id) {
477            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
478        }
479
480        // Extract coin symbol from instrument ID
481        let coin = subscription
482            .instrument_id
483            .symbol
484            .as_str()
485            .split('-')
486            .next()
487            .context("Invalid instrument symbol")?;
488        let coin = Ustr::from(coin);
489
490        // Clone WebSocket client for async task
491        let ws = self.ws_client.clone();
492
493        // Spawn async task to subscribe
494        tokio::spawn(async move {
495            if let Err(e) = ws.subscribe_trades(coin).await {
496                tracing::error!("Failed to subscribe to trades: {e:?}");
497            }
498        });
499
500        tracing::info!("Subscribed to trades for {}", subscription.instrument_id);
501
502        Ok(())
503    }
504
505    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
506        tracing::debug!(
507            "Unsubscribing from trades: {}",
508            unsubscription.instrument_id
509        );
510
511        // Extract coin symbol from instrument ID
512        let coin = unsubscription
513            .instrument_id
514            .symbol
515            .as_str()
516            .split('-')
517            .next()
518            .context("Invalid instrument symbol")?;
519        let coin = Ustr::from(coin);
520
521        // Clone WebSocket client for async task
522        let ws = self.ws_client.clone();
523
524        // Spawn async task to unsubscribe
525        tokio::spawn(async move {
526            if let Err(e) = ws.unsubscribe_trades(coin).await {
527                tracing::error!("Failed to unsubscribe from trades: {e:?}");
528            }
529        });
530
531        tracing::info!(
532            "Unsubscribed from trades for {}",
533            unsubscription.instrument_id
534        );
535
536        Ok(())
537    }
538
539    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
540        tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
541
542        // Validate book type
543        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
544            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
545        }
546
547        // Validate instrument exists
548        let instruments = self.instruments.read().unwrap();
549        if !instruments.contains_key(&subscription.instrument_id) {
550            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
551        }
552        drop(instruments);
553
554        // Extract coin symbol from instrument ID
555        let coin = subscription
556            .instrument_id
557            .symbol
558            .as_str()
559            .split('-')
560            .next()
561            .context("Invalid instrument symbol")?;
562        let coin = Ustr::from(coin);
563
564        // Clone WebSocket client for async task
565        let ws = self.ws_client.clone();
566
567        // Spawn async task to subscribe
568        tokio::spawn(async move {
569            if let Err(e) = ws.subscribe_book(coin).await {
570                tracing::error!("Failed to subscribe to book deltas: {e:?}");
571            }
572        });
573
574        tracing::info!(
575            "Subscribed to book deltas for {}",
576            subscription.instrument_id
577        );
578
579        Ok(())
580    }
581
582    fn unsubscribe_book_deltas(
583        &mut self,
584        unsubscription: &UnsubscribeBookDeltas,
585    ) -> anyhow::Result<()> {
586        tracing::debug!(
587            "Unsubscribing from book deltas: {}",
588            unsubscription.instrument_id
589        );
590
591        // Extract coin symbol from instrument ID
592        let coin = unsubscription
593            .instrument_id
594            .symbol
595            .as_str()
596            .split('-')
597            .next()
598            .context("Invalid instrument symbol")?;
599        let coin = Ustr::from(coin);
600
601        // Clone WebSocket client for async task
602        let ws = self.ws_client.clone();
603
604        // Spawn async task to unsubscribe
605        tokio::spawn(async move {
606            if let Err(e) = ws.unsubscribe_book(coin).await {
607                tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
608            }
609        });
610
611        tracing::info!(
612            "Unsubscribed from book deltas for {}",
613            unsubscription.instrument_id
614        );
615
616        Ok(())
617    }
618
619    fn subscribe_book_snapshots(
620        &mut self,
621        subscription: &SubscribeBookSnapshots,
622    ) -> anyhow::Result<()> {
623        tracing::debug!(
624            "Subscribing to book snapshots: {}",
625            subscription.instrument_id
626        );
627
628        // Validate book type
629        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
630            anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
631        }
632
633        // Validate instrument exists
634        let instruments = self.instruments.read().unwrap();
635        if !instruments.contains_key(&subscription.instrument_id) {
636            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
637        }
638        drop(instruments);
639
640        // Extract coin symbol from instrument ID
641        let coin = subscription
642            .instrument_id
643            .symbol
644            .as_str()
645            .split('-')
646            .next()
647            .context("Invalid instrument symbol")?;
648        let coin = Ustr::from(coin);
649
650        // Clone WebSocket client for async task
651        let ws = self.ws_client.clone();
652
653        // Spawn async task to subscribe
654        tokio::spawn(async move {
655            if let Err(e) = ws.subscribe_bbo(coin).await {
656                tracing::error!("Failed to subscribe to book snapshots: {e:?}");
657            }
658        });
659
660        tracing::info!(
661            "Subscribed to book snapshots for {}",
662            subscription.instrument_id
663        );
664
665        Ok(())
666    }
667
668    fn unsubscribe_book_snapshots(
669        &mut self,
670        unsubscription: &UnsubscribeBookSnapshots,
671    ) -> anyhow::Result<()> {
672        tracing::debug!(
673            "Unsubscribing from book snapshots: {}",
674            unsubscription.instrument_id
675        );
676
677        // Extract coin symbol from instrument ID
678        let coin = unsubscription
679            .instrument_id
680            .symbol
681            .as_str()
682            .split('-')
683            .next()
684            .context("Invalid instrument symbol")?;
685        let coin = Ustr::from(coin);
686
687        // Clone WebSocket client for async task
688        let ws = self.ws_client.clone();
689
690        // Spawn async task to unsubscribe
691        tokio::spawn(async move {
692            if let Err(e) = ws.unsubscribe_bbo(coin).await {
693                tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
694            }
695        });
696
697        tracing::info!(
698            "Unsubscribed from book snapshots for {}",
699            unsubscription.instrument_id
700        );
701
702        Ok(())
703    }
704
705    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
706        tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
707
708        // Validate instrument exists
709        let instruments = self.instruments.read().unwrap();
710        if !instruments.contains_key(&subscription.instrument_id) {
711            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
712        }
713        drop(instruments);
714
715        // Extract coin symbol from instrument ID
716        let coin = subscription
717            .instrument_id
718            .symbol
719            .as_str()
720            .split('-')
721            .next()
722            .context("Invalid instrument symbol")?;
723        let coin = Ustr::from(coin);
724
725        // Clone WebSocket client for async task
726        let ws = self.ws_client.clone();
727
728        // Spawn async task to subscribe
729        tokio::spawn(async move {
730            if let Err(e) = ws.subscribe_bbo(coin).await {
731                tracing::error!("Failed to subscribe to quotes: {e:?}");
732            }
733        });
734
735        tracing::info!("Subscribed to quotes for {}", subscription.instrument_id);
736
737        Ok(())
738    }
739
740    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
741        tracing::debug!(
742            "Unsubscribing from quotes: {}",
743            unsubscription.instrument_id
744        );
745
746        // Extract coin symbol from instrument ID
747        let coin = unsubscription
748            .instrument_id
749            .symbol
750            .as_str()
751            .split('-')
752            .next()
753            .context("Invalid instrument symbol")?;
754        let coin = Ustr::from(coin);
755
756        // Clone WebSocket client for async task
757        let ws = self.ws_client.clone();
758
759        // Spawn async task to unsubscribe
760        tokio::spawn(async move {
761            if let Err(e) = ws.unsubscribe_bbo(coin).await {
762                tracing::error!("Failed to unsubscribe from quotes: {e:?}");
763            }
764        });
765
766        tracing::info!(
767            "Unsubscribed from quotes for {}",
768            unsubscription.instrument_id
769        );
770
771        Ok(())
772    }
773
774    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
775        tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
776
777        // Validate instrument exists
778        let instruments = self.instruments.read().unwrap();
779        let instrument_id = subscription.bar_type.instrument_id();
780        if !instruments.contains_key(&instrument_id) {
781            anyhow::bail!("Instrument {} not found", instrument_id);
782        }
783
784        drop(instruments);
785
786        // Convert bar type to interval
787        let interval = bar_type_to_interval(&subscription.bar_type)?;
788
789        // Extract coin symbol from instrument ID
790        let coin = instrument_id
791            .symbol
792            .as_str()
793            .split('-')
794            .next()
795            .context("Invalid instrument symbol")?;
796        let coin = Ustr::from(coin);
797
798        // Clone WebSocket client for async task
799        let ws = self.ws_client.clone();
800
801        // Spawn async task to subscribe
802        tokio::spawn(async move {
803            if let Err(e) = ws.subscribe_candle(coin, interval).await {
804                tracing::error!("Failed to subscribe to bars: {e:?}");
805            }
806        });
807
808        tracing::info!("Subscribed to bars for {}", subscription.bar_type);
809
810        Ok(())
811    }
812
813    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
814        tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
815
816        // Convert bar type to interval
817        let interval = bar_type_to_interval(&unsubscription.bar_type)?;
818
819        // Extract coin symbol from instrument ID
820        let instrument_id = unsubscription.bar_type.instrument_id();
821        let coin = instrument_id
822            .symbol
823            .as_str()
824            .split('-')
825            .next()
826            .context("Invalid instrument symbol")?;
827        let coin = Ustr::from(coin);
828
829        // Clone WebSocket client for async task
830        let ws = self.ws_client.clone();
831
832        // Spawn async task to unsubscribe
833        tokio::spawn(async move {
834            if let Err(e) = ws.unsubscribe_candle(coin, interval).await {
835                tracing::error!("Failed to unsubscribe from bars: {e:?}");
836            }
837        });
838
839        tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
840
841        Ok(())
842    }
843}
844
845/// Convert BarType to Hyperliquid interval string.
846fn bar_type_to_interval(bar_type: &BarType) -> anyhow::Result<String> {
847    let spec = bar_type.spec();
848    let step = spec.step.get();
849
850    anyhow::ensure!(
851        bar_type.aggregation_source() == AggregationSource::External,
852        "Only EXTERNAL aggregation is supported"
853    );
854
855    let interval = match spec.aggregation {
856        BarAggregation::Minute => format!("{step}m"),
857        BarAggregation::Hour => format!("{step}h"),
858        BarAggregation::Day => format!("{step}d"),
859        a => anyhow::bail!("Hyperliquid does not support {a:?} aggregation"),
860    };
861
862    Ok(interval)
863}
864
865/// Convert HyperliquidCandle to Nautilus Bar.
866fn candle_to_bar(
867    candle: &HyperliquidCandle,
868    bar_type: BarType,
869    price_precision: u8,
870    size_precision: u8,
871) -> anyhow::Result<Bar> {
872    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000); // Convert ms to ns
873    let ts_event = ts_init;
874
875    let open = candle.open.parse::<f64>().context("parse open price")?;
876    let high = candle.high.parse::<f64>().context("parse high price")?;
877    let low = candle.low.parse::<f64>().context("parse low price")?;
878    let close = candle.close.parse::<f64>().context("parse close price")?;
879    let volume = candle.volume.parse::<f64>().context("parse volume")?;
880
881    Ok(Bar::new(
882        bar_type,
883        Price::new(open, price_precision),
884        Price::new(high, price_precision),
885        Price::new(low, price_precision),
886        Price::new(close, price_precision),
887        Quantity::new(volume, size_precision),
888        ts_event,
889        ts_init,
890    ))
891}
892
893/// Request bars from HTTP API.
894async fn request_bars_from_http(
895    http_client: HyperliquidHttpClient,
896    bar_type: BarType,
897    start: Option<DateTime<Utc>>,
898    end: Option<DateTime<Utc>>,
899    limit: Option<u32>,
900    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
901) -> anyhow::Result<Vec<Bar>> {
902    // Get instrument details for precision
903    let instrument_id = bar_type.instrument_id();
904    let instrument = {
905        let guard = instruments.read().unwrap();
906        guard
907            .get(&instrument_id)
908            .cloned()
909            .context("Instrument not found in cache")?
910    };
911
912    let price_precision = instrument.price_precision();
913    let size_precision = instrument.size_precision();
914
915    // Extract coin symbol from instrument ID (e.g., "BTC-PERP.HYPERLIQUID" -> "BTC")
916    let coin = instrument_id
917        .symbol
918        .as_str()
919        .split('-')
920        .next()
921        .context("Invalid instrument symbol")?;
922
923    // Convert bar type to Hyperliquid interval
924    let interval = bar_type_to_interval(&bar_type)?;
925
926    // Calculate time range (Hyperliquid uses milliseconds)
927    let now = Utc::now();
928    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
929    let start_time = if let Some(start) = start {
930        start.timestamp_millis() as u64
931    } else {
932        // Default to 1000 bars before end_time
933        let spec = bar_type.spec();
934        let step_ms = match spec.aggregation {
935            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
936            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
937            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
938            _ => 60_000, // Default to 1 minute
939        };
940        end_time.saturating_sub(1000 * step_ms)
941    };
942
943    // Fetch candles from API
944    let response = http_client
945        .info_candle_snapshot(coin, &interval, start_time, end_time)
946        .await
947        .context("Failed to fetch candle snapshot from Hyperliquid")?;
948
949    // Convert candles to bars
950    let mut bars: Vec<Bar> = response
951        .data
952        .iter()
953        .filter_map(|candle| {
954            candle_to_bar(candle, bar_type, price_precision, size_precision)
955                .map_err(|e| {
956                    tracing::warn!("Failed to convert candle to bar: {e}");
957                    e
958                })
959                .ok()
960        })
961        .collect();
962
963    // Apply limit if specified
964    if let Some(limit) = limit
965        && bars.len() > limit as usize
966    {
967        bars = bars.into_iter().take(limit as usize).collect();
968    }
969
970    tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
971    Ok(bars)
972}