nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    live::{runner::get_data_event_sender, runtime::get_runtime},
26    messages::{
27        DataEvent,
28        data::{
29            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
30            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
31            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
32            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
33            UnsubscribeQuotes, UnsubscribeTrades,
34        },
35    },
36};
37use nautilus_core::{
38    UnixNanos,
39    datetime::datetime_to_unix_nanos,
40    time::{AtomicTime, get_atomic_clock_realtime},
41};
42use nautilus_data::client::DataClient;
43use nautilus_model::{
44    data::{Bar, BarType, Data, OrderBookDeltas_API},
45    enums::{BarAggregation, BookType},
46    identifiers::{ClientId, InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    types::{Price, Quantity},
49};
50use tokio::task::JoinHandle;
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use crate::{
55    common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
56    config::HyperliquidDataClientConfig,
57    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
58    websocket::{
59        client::HyperliquidWebSocketClient,
60        messages::{HyperliquidWsMessage, NautilusWsMessage},
61        parse::{
62            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
63        },
64    },
65};
66
67#[derive(Debug)]
68pub struct HyperliquidDataClient {
69    client_id: ClientId,
70    #[allow(dead_code)]
71    config: HyperliquidDataClientConfig,
72    http_client: HyperliquidHttpClient,
73    ws_client: HyperliquidWebSocketClient,
74    is_connected: AtomicBool,
75    cancellation_token: CancellationToken,
76    tasks: Vec<JoinHandle<()>>,
77    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79    /// Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
80    /// for efficient O(1) lookup in WebSocket message handlers
81    coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
82    clock: &'static AtomicTime,
83    #[allow(dead_code)]
84    instrument_refresh_active: bool,
85}
86
87impl HyperliquidDataClient {
88    /// Creates a new [`HyperliquidDataClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the HTTP client fails to initialize.
93    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
94        let clock = get_atomic_clock_realtime();
95        let data_sender = get_data_event_sender();
96
97        let http_client = if let Some(private_key_str) = &config.private_key {
98            let secrets = crate::common::credential::Secrets {
99                private_key: crate::common::credential::EvmPrivateKey::new(
100                    private_key_str.clone(),
101                )?,
102                is_testnet: config.is_testnet,
103                vault_address: None,
104            };
105            HyperliquidHttpClient::with_credentials(
106                &secrets,
107                config.http_timeout_secs,
108                config.http_proxy_url.clone(),
109            )?
110        } else {
111            HyperliquidHttpClient::new(
112                config.is_testnet,
113                config.http_timeout_secs,
114                config.http_proxy_url.clone(),
115            )?
116        };
117
118        // Note: Rust data client is not the primary interface; Python adapter is used instead.
119        // Defaulting to Perp for basic functionality.
120        let ws_client = HyperliquidWebSocketClient::new(
121            None,
122            config.is_testnet,
123            HyperliquidProductType::Perp,
124            None,
125        );
126
127        Ok(Self {
128            client_id,
129            config,
130            http_client,
131            ws_client,
132            is_connected: AtomicBool::new(false),
133            cancellation_token: CancellationToken::new(),
134            tasks: Vec::new(),
135            data_sender,
136            instruments: Arc::new(RwLock::new(AHashMap::new())),
137            coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
138            clock,
139            instrument_refresh_active: false,
140        })
141    }
142
143    fn venue(&self) -> Venue {
144        *HYPERLIQUID_VENUE
145    }
146
147    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
148        let instruments = self
149            .http_client
150            .request_instruments()
151            .await
152            .context("failed to fetch instruments during bootstrap")?;
153
154        let mut instruments_map = self.instruments.write().unwrap();
155        let mut coin_map = self.coin_to_instrument_id.write().unwrap();
156
157        for instrument in &instruments {
158            let instrument_id = instrument.id();
159            instruments_map.insert(instrument_id, instrument.clone());
160
161            // Build coin-to-instrument-id index for efficient WebSocket message lookup
162            // Use raw_symbol which contains Hyperliquid's coin ticker (e.g., "BTC")
163            let coin = instrument.raw_symbol().inner();
164            if instrument_id.symbol.as_str().starts_with("BTCUSD") {
165                log::warn!(
166                    "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
167                    instrument_id,
168                    instrument.raw_symbol(),
169                    coin
170                );
171            }
172            coin_map.insert(coin, instrument_id);
173
174            self.ws_client.cache_instrument(instrument.clone());
175        }
176
177        tracing::info!(
178            "Bootstrapped {} instruments with {} coin mappings",
179            instruments_map.len(),
180            coin_map.len()
181        );
182        Ok(instruments)
183    }
184
185    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
186        // Clone client before connecting so the clone can have out_rx set
187        let mut ws_client = self.ws_client.clone();
188
189        ws_client
190            .connect()
191            .await
192            .context("failed to connect to Hyperliquid WebSocket")?;
193
194        let _data_sender = self.data_sender.clone();
195        let _instruments = Arc::clone(&self.instruments);
196        let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
197        let _venue = self.venue();
198        let _clock = self.clock;
199        let cancellation_token = self.cancellation_token.clone();
200
201        let task = get_runtime().spawn(async move {
202            tracing::info!("Hyperliquid WebSocket consumption loop started");
203
204            loop {
205                tokio::select! {
206                    _ = cancellation_token.cancelled() => {
207                        tracing::info!("WebSocket consumption loop cancelled");
208                        break;
209                    }
210                    msg_opt = ws_client.next_event() => {
211                        if let Some(msg) = msg_opt {
212                            match msg {
213                                // Handled by python/websocket.rs
214                                NautilusWsMessage::Trades(_)
215                                | NautilusWsMessage::Quote(_)
216                                | NautilusWsMessage::Deltas(_)
217                                | NautilusWsMessage::Candle(_)
218                                | NautilusWsMessage::MarkPrice(_)
219                                | NautilusWsMessage::IndexPrice(_)
220                                | NautilusWsMessage::FundingRate(_) => {}
221                                NautilusWsMessage::Reconnected => {
222                                    tracing::info!("WebSocket reconnected");
223                                }
224                                NautilusWsMessage::Error(e) => {
225                                    tracing::error!("WebSocket error: {e}");
226                                }
227                                NautilusWsMessage::ExecutionReports(_) => {
228                                    // Handled by execution client
229                                }
230                            }
231                        } else {
232                            // Connection closed or error
233                            tracing::warn!("WebSocket next_event returned None, connection may be closed");
234                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
235                        }
236                    }
237                }
238            }
239
240            tracing::info!("Hyperliquid WebSocket consumption loop finished");
241        });
242
243        self.tasks.push(task);
244        tracing::info!("WebSocket consumption task spawned");
245
246        Ok(())
247    }
248
249    #[allow(dead_code)]
250    fn handle_ws_message(
251        msg: HyperliquidWsMessage,
252        ws_client: &HyperliquidWebSocketClient,
253        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
254        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
255        coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
256        _venue: Venue,
257        clock: &'static AtomicTime,
258    ) {
259        match msg {
260            HyperliquidWsMessage::Bbo { data } => {
261                let coin = data.coin;
262                tracing::debug!("Received BBO message for coin: {coin}");
263
264                // Use efficient O(1) lookup instead of iterating through all instruments
265                // Hyperliquid WebSocket sends coin="BTC", lookup returns "BTC-PERP" instrument ID
266                let coin_map = coin_to_instrument_id.read().unwrap();
267                let instrument_id = coin_map.get(&data.coin);
268
269                if let Some(&instrument_id) = instrument_id {
270                    let instruments_map = instruments.read().unwrap();
271                    if let Some(instrument) = instruments_map.get(&instrument_id) {
272                        let ts_init = clock.get_time_ns();
273
274                        match parse_ws_quote_tick(&data, instrument, ts_init) {
275                            Ok(quote_tick) => {
276                                tracing::debug!(
277                                    "Parsed quote tick for {}: bid={}, ask={}",
278                                    data.coin,
279                                    quote_tick.bid_price,
280                                    quote_tick.ask_price
281                                );
282                                if let Err(e) =
283                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
284                                {
285                                    tracing::error!("Failed to send quote tick: {e}");
286                                }
287                            }
288                            Err(e) => {
289                                tracing::error!(
290                                    "Failed to parse quote tick for {}: {e}",
291                                    data.coin
292                                );
293                            }
294                        }
295                    }
296                } else {
297                    tracing::warn!(
298                        "Received BBO for unknown coin: {} (no matching instrument found)",
299                        data.coin
300                    );
301                }
302            }
303            HyperliquidWsMessage::Trades { data } => {
304                let count = data.len();
305                tracing::debug!("Received {count} trade(s)");
306
307                // Process each trade in the batch
308                for trade_data in data {
309                    let coin = trade_data.coin;
310                    let coin_map = coin_to_instrument_id.read().unwrap();
311
312                    if let Some(&instrument_id) = coin_map.get(&coin) {
313                        let instruments_map = instruments.read().unwrap();
314                        if let Some(instrument) = instruments_map.get(&instrument_id) {
315                            let ts_init = clock.get_time_ns();
316
317                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
318                                Ok(trade_tick) => {
319                                    if let Err(e) =
320                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
321                                    {
322                                        tracing::error!("Failed to send trade tick: {e}");
323                                    }
324                                }
325                                Err(e) => {
326                                    tracing::error!("Failed to parse trade tick for {coin}: {e}");
327                                }
328                            }
329                        }
330                    } else {
331                        tracing::warn!("Received trade for unknown coin: {coin}");
332                    }
333                }
334            }
335            HyperliquidWsMessage::L2Book { data } => {
336                let coin = data.coin;
337                tracing::debug!("Received L2 book update for coin: {coin}");
338
339                let coin_map = coin_to_instrument_id.read().unwrap();
340                if let Some(&instrument_id) = coin_map.get(&data.coin) {
341                    let instruments_map = instruments.read().unwrap();
342                    if let Some(instrument) = instruments_map.get(&instrument_id) {
343                        let ts_init = clock.get_time_ns();
344
345                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
346                            Ok(deltas) => {
347                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
348                                    OrderBookDeltas_API::new(deltas),
349                                ))) {
350                                    tracing::error!("Failed to send order book deltas: {e}");
351                                }
352                            }
353                            Err(e) => {
354                                tracing::error!(
355                                    "Failed to parse order book deltas for {}: {e}",
356                                    data.coin
357                                );
358                            }
359                        }
360                    }
361                } else {
362                    tracing::warn!("Received L2 book for unknown coin: {coin}");
363                }
364            }
365            HyperliquidWsMessage::Candle { data } => {
366                let coin = &data.s;
367                let interval = &data.i;
368                tracing::debug!("Received candle for {coin}:{interval}");
369
370                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
371                    let coin = Ustr::from(&data.s);
372                    let coin_map = coin_to_instrument_id.read().unwrap();
373
374                    if let Some(&instrument_id) = coin_map.get(&coin) {
375                        let instruments_map = instruments.read().unwrap();
376                        if let Some(instrument) = instruments_map.get(&instrument_id) {
377                            let ts_init = clock.get_time_ns();
378
379                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
380                                Ok(bar) => {
381                                    if let Err(e) =
382                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
383                                    {
384                                        tracing::error!("Failed to send bar data: {e}");
385                                    }
386                                }
387                                Err(e) => {
388                                    tracing::error!("Failed to parse candle for {coin}: {e}");
389                                }
390                            }
391                        }
392                    } else {
393                        tracing::warn!("Received candle for unknown coin: {coin}");
394                    }
395                } else {
396                    tracing::debug!("Received candle for {coin}:{interval} but no BarType tracked");
397                }
398            }
399            _ => {
400                // Log other message types for debugging
401                tracing::trace!("Received unhandled WebSocket message: {:?}", msg);
402            }
403        }
404    }
405
406    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
407        let instruments = self.instruments.read().unwrap();
408        instruments
409            .get(instrument_id)
410            .cloned()
411            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
412    }
413}
414
415impl HyperliquidDataClient {
416    #[allow(dead_code)]
417    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
418        if let Err(e) = sender.send(DataEvent::Data(data)) {
419            tracing::error!("Failed to emit data event: {e}");
420        }
421    }
422}
423
424#[async_trait::async_trait(?Send)]
425impl DataClient for HyperliquidDataClient {
426    fn client_id(&self) -> ClientId {
427        self.client_id
428    }
429
430    fn venue(&self) -> Option<Venue> {
431        Some(self.venue())
432    }
433
434    fn start(&mut self) -> anyhow::Result<()> {
435        tracing::info!(
436            client_id = %self.client_id,
437            is_testnet = self.config.is_testnet,
438            http_proxy_url = ?self.config.http_proxy_url,
439            ws_proxy_url = ?self.config.ws_proxy_url,
440            "Starting Hyperliquid data client"
441        );
442        Ok(())
443    }
444
445    fn stop(&mut self) -> anyhow::Result<()> {
446        tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
447        self.cancellation_token.cancel();
448        self.is_connected.store(false, Ordering::Relaxed);
449        Ok(())
450    }
451
452    fn reset(&mut self) -> anyhow::Result<()> {
453        tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
454        self.is_connected.store(false, Ordering::Relaxed);
455        self.cancellation_token = CancellationToken::new();
456        self.tasks.clear();
457        Ok(())
458    }
459
460    fn dispose(&mut self) -> anyhow::Result<()> {
461        tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
462        self.stop()
463    }
464
465    fn is_connected(&self) -> bool {
466        self.is_connected.load(Ordering::Acquire)
467    }
468
469    fn is_disconnected(&self) -> bool {
470        !self.is_connected()
471    }
472
473    async fn connect(&mut self) -> anyhow::Result<()> {
474        if self.is_connected() {
475            return Ok(());
476        }
477
478        // Bootstrap instruments from HTTP API
479        let _instruments = self
480            .bootstrap_instruments()
481            .await
482            .context("failed to bootstrap instruments")?;
483
484        // Connect WebSocket client
485        self.spawn_ws()
486            .await
487            .context("failed to spawn WebSocket client")?;
488
489        self.is_connected.store(true, Ordering::Relaxed);
490        tracing::info!(client_id = %self.client_id, "Connected");
491
492        Ok(())
493    }
494
495    async fn disconnect(&mut self) -> anyhow::Result<()> {
496        if !self.is_connected() {
497            return Ok(());
498        }
499
500        // Cancel all tasks
501        self.cancellation_token.cancel();
502
503        // Wait for all tasks to complete
504        for task in self.tasks.drain(..) {
505            if let Err(e) = task.await {
506                tracing::error!("Error waiting for task to complete: {e}");
507            }
508        }
509
510        // Disconnect WebSocket client
511        if let Err(e) = self.ws_client.disconnect().await {
512            tracing::error!("Error disconnecting WebSocket client: {e}");
513        }
514
515        // Clear state
516        {
517            let mut instruments = self.instruments.write().unwrap();
518            instruments.clear();
519        }
520
521        self.is_connected.store(false, Ordering::Relaxed);
522        tracing::info!(client_id = %self.client_id, "Disconnected");
523
524        Ok(())
525    }
526
527    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
528        tracing::debug!("Requesting all instruments");
529
530        let instruments = {
531            let instruments_map = self.instruments.read().unwrap();
532            instruments_map.values().cloned().collect()
533        };
534
535        let response = DataResponse::Instruments(InstrumentsResponse::new(
536            request.request_id,
537            request.client_id.unwrap_or(self.client_id),
538            self.venue(),
539            instruments,
540            datetime_to_unix_nanos(request.start),
541            datetime_to_unix_nanos(request.end),
542            self.clock.get_time_ns(),
543            request.params.clone(),
544        ));
545
546        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
547            tracing::error!("Failed to send instruments response: {e}");
548        }
549
550        Ok(())
551    }
552
553    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
554        tracing::debug!("Requesting instrument: {}", request.instrument_id);
555
556        let instrument = self.get_instrument(&request.instrument_id)?;
557
558        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
559            request.request_id,
560            request.client_id.unwrap_or(self.client_id),
561            instrument.id(),
562            instrument,
563            datetime_to_unix_nanos(request.start),
564            datetime_to_unix_nanos(request.end),
565            self.clock.get_time_ns(),
566            request.params.clone(),
567        )));
568
569        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
570            tracing::error!("Failed to send instrument response: {e}");
571        }
572
573        Ok(())
574    }
575
576    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
577        tracing::debug!("Requesting bars for {}", request.bar_type);
578
579        let http = self.http_client.clone();
580        let sender = self.data_sender.clone();
581        let bar_type = request.bar_type;
582        let start = request.start;
583        let end = request.end;
584        let limit = request.limit.map(|n| n.get() as u32);
585        let request_id = request.request_id;
586        let client_id = request.client_id.unwrap_or(self.client_id);
587        let params = request.params.clone();
588        let clock = self.clock;
589        let start_nanos = datetime_to_unix_nanos(start);
590        let end_nanos = datetime_to_unix_nanos(end);
591        let instruments = Arc::clone(&self.instruments);
592
593        get_runtime().spawn(async move {
594            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
595                Ok(bars) => {
596                    let response = DataResponse::Bars(BarsResponse::new(
597                        request_id,
598                        client_id,
599                        bar_type,
600                        bars,
601                        start_nanos,
602                        end_nanos,
603                        clock.get_time_ns(),
604                        params,
605                    ));
606                    if let Err(e) = sender.send(DataEvent::Response(response)) {
607                        tracing::error!("Failed to send bars response: {e}");
608                    }
609                }
610                Err(e) => tracing::error!("Bar request failed: {e:?}"),
611            }
612        });
613
614        Ok(())
615    }
616
617    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
618        tracing::debug!("Requesting trades for {}", request.instrument_id);
619
620        // NOTE: Hyperliquid does not provide public historical trade data via REST API
621        // - Real-time trades are available via WebSocket (subscribe_trades)
622        // - User fills (authenticated) are available via generate_fill_reports
623        // For now, return empty response similar to exchanges without public trade history
624        tracing::warn!(
625            "Historical trade data not available via REST on Hyperliquid for {}",
626            request.instrument_id
627        );
628
629        let trades = Vec::new();
630
631        let response = DataResponse::Trades(TradesResponse::new(
632            request.request_id,
633            request.client_id.unwrap_or(self.client_id),
634            request.instrument_id,
635            trades,
636            datetime_to_unix_nanos(request.start),
637            datetime_to_unix_nanos(request.end),
638            self.clock.get_time_ns(),
639            request.params.clone(),
640        ));
641
642        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
643            tracing::error!("Failed to send trades response: {e}");
644        }
645
646        Ok(())
647    }
648
649    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
650        tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
651
652        let ws = self.ws_client.clone();
653        let instrument_id = subscription.instrument_id;
654
655        get_runtime().spawn(async move {
656            if let Err(e) = ws.subscribe_trades(instrument_id).await {
657                tracing::error!("Failed to subscribe to trades: {e:?}");
658            }
659        });
660
661        Ok(())
662    }
663
664    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
665        tracing::debug!(
666            "Unsubscribing from trades: {}",
667            unsubscription.instrument_id
668        );
669
670        let ws = self.ws_client.clone();
671        let instrument_id = unsubscription.instrument_id;
672
673        get_runtime().spawn(async move {
674            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
675                tracing::error!("Failed to unsubscribe from trades: {e:?}");
676            }
677        });
678
679        tracing::info!(
680            "Unsubscribed from trades for {}",
681            unsubscription.instrument_id
682        );
683
684        Ok(())
685    }
686
687    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
688        tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
689
690        if subscription.book_type != BookType::L2_MBP {
691            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
692        }
693
694        let ws = self.ws_client.clone();
695        let instrument_id = subscription.instrument_id;
696
697        get_runtime().spawn(async move {
698            if let Err(e) = ws.subscribe_book(instrument_id).await {
699                tracing::error!("Failed to subscribe to book deltas: {e:?}");
700            }
701        });
702
703        Ok(())
704    }
705
706    fn unsubscribe_book_deltas(
707        &mut self,
708        unsubscription: &UnsubscribeBookDeltas,
709    ) -> anyhow::Result<()> {
710        tracing::debug!(
711            "Unsubscribing from book deltas: {}",
712            unsubscription.instrument_id
713        );
714
715        let ws = self.ws_client.clone();
716        let instrument_id = unsubscription.instrument_id;
717
718        get_runtime().spawn(async move {
719            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
720                tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
721            }
722        });
723
724        Ok(())
725    }
726
727    fn subscribe_book_snapshots(
728        &mut self,
729        subscription: &SubscribeBookSnapshots,
730    ) -> anyhow::Result<()> {
731        tracing::debug!(
732            "Subscribing to book snapshots: {}",
733            subscription.instrument_id
734        );
735
736        if subscription.book_type != BookType::L2_MBP {
737            anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
738        }
739
740        let ws = self.ws_client.clone();
741        let instrument_id = subscription.instrument_id;
742
743        get_runtime().spawn(async move {
744            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
745                tracing::error!("Failed to subscribe to book snapshots: {e:?}");
746            }
747        });
748
749        Ok(())
750    }
751
752    fn unsubscribe_book_snapshots(
753        &mut self,
754        unsubscription: &UnsubscribeBookSnapshots,
755    ) -> anyhow::Result<()> {
756        tracing::debug!(
757            "Unsubscribing from book snapshots: {}",
758            unsubscription.instrument_id
759        );
760
761        let ws = self.ws_client.clone();
762        let instrument_id = unsubscription.instrument_id;
763
764        get_runtime().spawn(async move {
765            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
766                tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
767            }
768        });
769
770        Ok(())
771    }
772
773    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
774        tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
775
776        let ws = self.ws_client.clone();
777        let instrument_id = subscription.instrument_id;
778
779        get_runtime().spawn(async move {
780            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
781                tracing::error!("Failed to subscribe to quotes: {e:?}");
782            }
783        });
784
785        Ok(())
786    }
787
788    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
789        tracing::debug!(
790            "Unsubscribing from quotes: {}",
791            unsubscription.instrument_id
792        );
793
794        let ws = self.ws_client.clone();
795        let instrument_id = unsubscription.instrument_id;
796
797        get_runtime().spawn(async move {
798            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
799                tracing::error!("Failed to unsubscribe from quotes: {e:?}");
800            }
801        });
802
803        tracing::info!(
804            "Unsubscribed from quotes for {}",
805            unsubscription.instrument_id
806        );
807
808        Ok(())
809    }
810
811    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
812        tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
813
814        let instruments = self.instruments.read().unwrap();
815        let instrument_id = subscription.bar_type.instrument_id();
816        if !instruments.contains_key(&instrument_id) {
817            anyhow::bail!("Instrument {instrument_id} not found");
818        }
819
820        drop(instruments);
821
822        let bar_type = subscription.bar_type;
823        let ws = self.ws_client.clone();
824
825        get_runtime().spawn(async move {
826            if let Err(e) = ws.subscribe_bars(bar_type).await {
827                tracing::error!("Failed to subscribe to bars: {e:?}");
828            }
829        });
830
831        tracing::info!("Subscribed to bars for {}", subscription.bar_type);
832
833        Ok(())
834    }
835
836    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
837        tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
838
839        let bar_type = unsubscription.bar_type;
840        let ws = self.ws_client.clone();
841
842        get_runtime().spawn(async move {
843            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
844                tracing::error!("Failed to unsubscribe from bars: {e:?}");
845            }
846        });
847
848        tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
849
850        Ok(())
851    }
852}
853
854pub(crate) fn candle_to_bar(
855    candle: &HyperliquidCandle,
856    bar_type: BarType,
857    price_precision: u8,
858    size_precision: u8,
859) -> anyhow::Result<Bar> {
860    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
861    let ts_event = ts_init;
862
863    let open = candle.open.parse::<f64>().context("parse open price")?;
864    let high = candle.high.parse::<f64>().context("parse high price")?;
865    let low = candle.low.parse::<f64>().context("parse low price")?;
866    let close = candle.close.parse::<f64>().context("parse close price")?;
867    let volume = candle.volume.parse::<f64>().context("parse volume")?;
868
869    Ok(Bar::new(
870        bar_type,
871        Price::new(open, price_precision),
872        Price::new(high, price_precision),
873        Price::new(low, price_precision),
874        Price::new(close, price_precision),
875        Quantity::new(volume, size_precision),
876        ts_event,
877        ts_init,
878    ))
879}
880
881/// Request bars from HTTP API.
882async fn request_bars_from_http(
883    http_client: HyperliquidHttpClient,
884    bar_type: BarType,
885    start: Option<DateTime<Utc>>,
886    end: Option<DateTime<Utc>>,
887    limit: Option<u32>,
888    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
889) -> anyhow::Result<Vec<Bar>> {
890    // Get instrument details for precision
891    let instrument_id = bar_type.instrument_id();
892    let instrument = {
893        let guard = instruments.read().unwrap();
894        guard
895            .get(&instrument_id)
896            .cloned()
897            .context("instrument not found in cache")?
898    };
899
900    let price_precision = instrument.price_precision();
901    let size_precision = instrument.size_precision();
902
903    // Extract coin symbol from instrument ID (e.g., "BTC-PERP.HYPERLIQUID" -> "BTC")
904    let coin = instrument_id
905        .symbol
906        .as_str()
907        .split('-')
908        .next()
909        .context("invalid instrument symbol")?;
910
911    let interval = bar_type_to_interval(&bar_type)?;
912
913    // Hyperliquid uses millisecond timestamps
914    let now = Utc::now();
915    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
916    let start_time = if let Some(start) = start {
917        start.timestamp_millis() as u64
918    } else {
919        // Default to 1000 bars before end_time
920        let spec = bar_type.spec();
921        let step_ms = match spec.aggregation {
922            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
923            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
924            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
925            _ => 60_000,
926        };
927        end_time.saturating_sub(1000 * step_ms)
928    };
929
930    let candles = http_client
931        .info_candle_snapshot(coin, interval, start_time, end_time)
932        .await
933        .context("failed to fetch candle snapshot from Hyperliquid")?;
934
935    let mut bars: Vec<Bar> = candles
936        .iter()
937        .filter_map(|candle| {
938            candle_to_bar(candle, bar_type, price_precision, size_precision)
939                .map_err(|e| {
940                    tracing::warn!("Failed to convert candle to bar: {e}");
941                    e
942                })
943                .ok()
944        })
945        .collect();
946
947    if let Some(limit) = limit
948        && bars.len() > limit as usize
949    {
950        bars = bars.into_iter().take(limit as usize).collect();
951    }
952
953    tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
954    Ok(bars)
955}