nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    clients::DataClient,
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent,
29        data::{
30            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
31            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
32            SubscribeBookDeltas, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
33            UnsubscribeBookDeltas, UnsubscribeQuotes, UnsubscribeTrades,
34        },
35    },
36};
37use nautilus_core::{
38    UnixNanos,
39    datetime::datetime_to_unix_nanos,
40    time::{AtomicTime, get_atomic_clock_realtime},
41};
42use nautilus_model::{
43    data::{Bar, BarType, Data, OrderBookDeltas_API},
44    enums::{BarAggregation, BookType},
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
55    config::HyperliquidDataClientConfig,
56    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57    websocket::{
58        client::HyperliquidWebSocketClient,
59        messages::{HyperliquidWsMessage, NautilusWsMessage},
60        parse::{
61            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
62        },
63    },
64};
65
66#[derive(Debug)]
67pub struct HyperliquidDataClient {
68    client_id: ClientId,
69    #[allow(dead_code)]
70    config: HyperliquidDataClientConfig,
71    http_client: HyperliquidHttpClient,
72    ws_client: HyperliquidWebSocketClient,
73    is_connected: AtomicBool,
74    cancellation_token: CancellationToken,
75    tasks: Vec<JoinHandle<()>>,
76    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78    /// Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
79    /// for efficient O(1) lookup in WebSocket message handlers
80    coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
81    clock: &'static AtomicTime,
82    #[allow(dead_code)]
83    instrument_refresh_active: bool,
84}
85
86impl HyperliquidDataClient {
87    /// Creates a new [`HyperliquidDataClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the HTTP client fails to initialize.
92    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
93        let clock = get_atomic_clock_realtime();
94        let data_sender = get_data_event_sender();
95
96        let http_client = if let Some(private_key_str) = &config.private_key {
97            let secrets = crate::common::credential::Secrets {
98                private_key: crate::common::credential::EvmPrivateKey::new(
99                    private_key_str.clone(),
100                )?,
101                is_testnet: config.is_testnet,
102                vault_address: None,
103            };
104            HyperliquidHttpClient::with_credentials(
105                &secrets,
106                config.http_timeout_secs,
107                config.http_proxy_url.clone(),
108            )?
109        } else {
110            HyperliquidHttpClient::new(
111                config.is_testnet,
112                config.http_timeout_secs,
113                config.http_proxy_url.clone(),
114            )?
115        };
116
117        // Note: Rust data client is not the primary interface; Python adapter is used instead.
118        // Defaulting to Perp for basic functionality.
119        let ws_client = HyperliquidWebSocketClient::new(
120            None,
121            config.is_testnet,
122            HyperliquidProductType::Perp,
123            None,
124        );
125
126        Ok(Self {
127            client_id,
128            config,
129            http_client,
130            ws_client,
131            is_connected: AtomicBool::new(false),
132            cancellation_token: CancellationToken::new(),
133            tasks: Vec::new(),
134            data_sender,
135            instruments: Arc::new(RwLock::new(AHashMap::new())),
136            coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
137            clock,
138            instrument_refresh_active: false,
139        })
140    }
141
142    fn venue(&self) -> Venue {
143        *HYPERLIQUID_VENUE
144    }
145
146    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
147        let instruments = self
148            .http_client
149            .request_instruments()
150            .await
151            .context("failed to fetch instruments during bootstrap")?;
152
153        let mut instruments_map = self.instruments.write().unwrap();
154        let mut coin_map = self.coin_to_instrument_id.write().unwrap();
155
156        for instrument in &instruments {
157            let instrument_id = instrument.id();
158            instruments_map.insert(instrument_id, instrument.clone());
159
160            // Build coin-to-instrument-id index for efficient WebSocket message lookup
161            // Use raw_symbol which contains Hyperliquid's coin ticker (e.g., "BTC")
162            let coin = instrument.raw_symbol().inner();
163            if instrument_id.symbol.as_str().starts_with("BTCUSD") {
164                log::warn!(
165                    "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
166                    instrument_id,
167                    instrument.raw_symbol(),
168                    coin
169                );
170            }
171            coin_map.insert(coin, instrument_id);
172
173            self.ws_client.cache_instrument(instrument.clone());
174        }
175
176        log::info!(
177            "Bootstrapped {} instruments with {} coin mappings",
178            instruments_map.len(),
179            coin_map.len()
180        );
181        Ok(instruments)
182    }
183
184    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
185        // Clone client before connecting so the clone can have out_rx set
186        let mut ws_client = self.ws_client.clone();
187
188        ws_client
189            .connect()
190            .await
191            .context("failed to connect to Hyperliquid WebSocket")?;
192
193        let _data_sender = self.data_sender.clone();
194        let _instruments = Arc::clone(&self.instruments);
195        let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
196        let _venue = self.venue();
197        let _clock = self.clock;
198        let cancellation_token = self.cancellation_token.clone();
199
200        let task = get_runtime().spawn(async move {
201            log::info!("Hyperliquid WebSocket consumption loop started");
202
203            loop {
204                tokio::select! {
205                    () = cancellation_token.cancelled() => {
206                        log::info!("WebSocket consumption loop cancelled");
207                        break;
208                    }
209                    msg_opt = ws_client.next_event() => {
210                        if let Some(msg) = msg_opt {
211                            match msg {
212                                // Handled by python/websocket.rs
213                                NautilusWsMessage::Trades(_)
214                                | NautilusWsMessage::Quote(_)
215                                | NautilusWsMessage::Deltas(_)
216                                | NautilusWsMessage::Candle(_)
217                                | NautilusWsMessage::MarkPrice(_)
218                                | NautilusWsMessage::IndexPrice(_)
219                                | NautilusWsMessage::FundingRate(_) => {}
220                                NautilusWsMessage::Reconnected => {
221                                    log::info!("WebSocket reconnected");
222                                }
223                                NautilusWsMessage::Error(e) => {
224                                    log::error!("WebSocket error: {e}");
225                                }
226                                NautilusWsMessage::ExecutionReports(_) => {
227                                    // Handled by execution client
228                                }
229                            }
230                        } else {
231                            // Connection closed or error
232                            log::warn!("WebSocket next_event returned None, connection may be closed");
233                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
234                        }
235                    }
236                }
237            }
238
239            log::info!("Hyperliquid WebSocket consumption loop finished");
240        });
241
242        self.tasks.push(task);
243        log::info!("WebSocket consumption task spawned");
244
245        Ok(())
246    }
247
248    #[allow(dead_code)]
249    fn handle_ws_message(
250        msg: HyperliquidWsMessage,
251        ws_client: &HyperliquidWebSocketClient,
252        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
253        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
254        coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
255        _venue: Venue,
256        clock: &'static AtomicTime,
257    ) {
258        match msg {
259            HyperliquidWsMessage::Bbo { data } => {
260                let coin = data.coin;
261                log::debug!("Received BBO message for coin: {coin}");
262
263                // Use efficient O(1) lookup instead of iterating through all instruments
264                // Hyperliquid WebSocket sends coin="BTC", lookup returns "BTC-PERP" instrument ID
265                let coin_map = coin_to_instrument_id.read().unwrap();
266                let instrument_id = coin_map.get(&data.coin);
267
268                if let Some(&instrument_id) = instrument_id {
269                    let instruments_map = instruments.read().unwrap();
270                    if let Some(instrument) = instruments_map.get(&instrument_id) {
271                        let ts_init = clock.get_time_ns();
272
273                        match parse_ws_quote_tick(&data, instrument, ts_init) {
274                            Ok(quote_tick) => {
275                                log::debug!(
276                                    "Parsed quote tick for {}: bid={}, ask={}",
277                                    data.coin,
278                                    quote_tick.bid_price,
279                                    quote_tick.ask_price
280                                );
281                                if let Err(e) =
282                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
283                                {
284                                    log::error!("Failed to send quote tick: {e}");
285                                }
286                            }
287                            Err(e) => {
288                                log::error!("Failed to parse quote tick for {}: {e}", data.coin);
289                            }
290                        }
291                    }
292                } else {
293                    log::warn!(
294                        "Received BBO for unknown coin: {} (no matching instrument found)",
295                        data.coin
296                    );
297                }
298            }
299            HyperliquidWsMessage::Trades { data } => {
300                let count = data.len();
301                log::debug!("Received {count} trade(s)");
302
303                // Process each trade in the batch
304                for trade_data in data {
305                    let coin = trade_data.coin;
306                    let coin_map = coin_to_instrument_id.read().unwrap();
307
308                    if let Some(&instrument_id) = coin_map.get(&coin) {
309                        let instruments_map = instruments.read().unwrap();
310                        if let Some(instrument) = instruments_map.get(&instrument_id) {
311                            let ts_init = clock.get_time_ns();
312
313                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
314                                Ok(trade_tick) => {
315                                    if let Err(e) =
316                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
317                                    {
318                                        log::error!("Failed to send trade tick: {e}");
319                                    }
320                                }
321                                Err(e) => {
322                                    log::error!("Failed to parse trade tick for {coin}: {e}");
323                                }
324                            }
325                        }
326                    } else {
327                        log::warn!("Received trade for unknown coin: {coin}");
328                    }
329                }
330            }
331            HyperliquidWsMessage::L2Book { data } => {
332                let coin = data.coin;
333                log::debug!("Received L2 book update for coin: {coin}");
334
335                let coin_map = coin_to_instrument_id.read().unwrap();
336                if let Some(&instrument_id) = coin_map.get(&data.coin) {
337                    let instruments_map = instruments.read().unwrap();
338                    if let Some(instrument) = instruments_map.get(&instrument_id) {
339                        let ts_init = clock.get_time_ns();
340
341                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
342                            Ok(deltas) => {
343                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
344                                    OrderBookDeltas_API::new(deltas),
345                                ))) {
346                                    log::error!("Failed to send order book deltas: {e}");
347                                }
348                            }
349                            Err(e) => {
350                                log::error!(
351                                    "Failed to parse order book deltas for {}: {e}",
352                                    data.coin
353                                );
354                            }
355                        }
356                    }
357                } else {
358                    log::warn!("Received L2 book for unknown coin: {coin}");
359                }
360            }
361            HyperliquidWsMessage::Candle { data } => {
362                let coin = &data.s;
363                let interval = &data.i;
364                log::debug!("Received candle for {coin}:{interval}");
365
366                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
367                    let coin = Ustr::from(&data.s);
368                    let coin_map = coin_to_instrument_id.read().unwrap();
369
370                    if let Some(&instrument_id) = coin_map.get(&coin) {
371                        let instruments_map = instruments.read().unwrap();
372                        if let Some(instrument) = instruments_map.get(&instrument_id) {
373                            let ts_init = clock.get_time_ns();
374
375                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
376                                Ok(bar) => {
377                                    if let Err(e) =
378                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
379                                    {
380                                        log::error!("Failed to send bar data: {e}");
381                                    }
382                                }
383                                Err(e) => {
384                                    log::error!("Failed to parse candle for {coin}: {e}");
385                                }
386                            }
387                        }
388                    } else {
389                        log::warn!("Received candle for unknown coin: {coin}");
390                    }
391                } else {
392                    log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
393                }
394            }
395            _ => {
396                // Log other message types for debugging
397                log::trace!("Received unhandled WebSocket message: {msg:?}");
398            }
399        }
400    }
401
402    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
403        let instruments = self.instruments.read().unwrap();
404        instruments
405            .get(instrument_id)
406            .cloned()
407            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
408    }
409}
410
411impl HyperliquidDataClient {
412    #[allow(dead_code)]
413    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
414        if let Err(e) = sender.send(DataEvent::Data(data)) {
415            log::error!("Failed to emit data event: {e}");
416        }
417    }
418}
419
420#[async_trait::async_trait(?Send)]
421impl DataClient for HyperliquidDataClient {
422    fn client_id(&self) -> ClientId {
423        self.client_id
424    }
425
426    fn venue(&self) -> Option<Venue> {
427        Some(self.venue())
428    }
429
430    fn start(&mut self) -> anyhow::Result<()> {
431        log::info!(
432            "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
433            self.client_id,
434            self.config.is_testnet,
435            self.config.http_proxy_url,
436            self.config.ws_proxy_url,
437        );
438        Ok(())
439    }
440
441    fn stop(&mut self) -> anyhow::Result<()> {
442        log::info!("Stopping Hyperliquid data client {}", self.client_id);
443        self.cancellation_token.cancel();
444        self.is_connected.store(false, Ordering::Relaxed);
445        Ok(())
446    }
447
448    fn reset(&mut self) -> anyhow::Result<()> {
449        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
450        self.is_connected.store(false, Ordering::Relaxed);
451        self.cancellation_token = CancellationToken::new();
452        self.tasks.clear();
453        Ok(())
454    }
455
456    fn dispose(&mut self) -> anyhow::Result<()> {
457        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
458        self.stop()
459    }
460
461    fn is_connected(&self) -> bool {
462        self.is_connected.load(Ordering::Acquire)
463    }
464
465    fn is_disconnected(&self) -> bool {
466        !self.is_connected()
467    }
468
469    async fn connect(&mut self) -> anyhow::Result<()> {
470        if self.is_connected() {
471            return Ok(());
472        }
473
474        // Bootstrap instruments from HTTP API
475        let _instruments = self
476            .bootstrap_instruments()
477            .await
478            .context("failed to bootstrap instruments")?;
479
480        // Connect WebSocket client
481        self.spawn_ws()
482            .await
483            .context("failed to spawn WebSocket client")?;
484
485        self.is_connected.store(true, Ordering::Relaxed);
486        log::info!("Connected: client_id={}", self.client_id);
487
488        Ok(())
489    }
490
491    async fn disconnect(&mut self) -> anyhow::Result<()> {
492        if !self.is_connected() {
493            return Ok(());
494        }
495
496        // Cancel all tasks
497        self.cancellation_token.cancel();
498
499        // Wait for all tasks to complete
500        for task in self.tasks.drain(..) {
501            if let Err(e) = task.await {
502                log::error!("Error waiting for task to complete: {e}");
503            }
504        }
505
506        // Disconnect WebSocket client
507        if let Err(e) = self.ws_client.disconnect().await {
508            log::error!("Error disconnecting WebSocket client: {e}");
509        }
510
511        // Clear state
512        {
513            let mut instruments = self.instruments.write().unwrap();
514            instruments.clear();
515        }
516
517        self.is_connected.store(false, Ordering::Relaxed);
518        log::info!("Disconnected: client_id={}", self.client_id);
519
520        Ok(())
521    }
522
523    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
524        log::debug!("Requesting all instruments");
525
526        let instruments = {
527            let instruments_map = self.instruments.read().unwrap();
528            instruments_map.values().cloned().collect()
529        };
530
531        let response = DataResponse::Instruments(InstrumentsResponse::new(
532            request.request_id,
533            request.client_id.unwrap_or(self.client_id),
534            self.venue(),
535            instruments,
536            datetime_to_unix_nanos(request.start),
537            datetime_to_unix_nanos(request.end),
538            self.clock.get_time_ns(),
539            request.params.clone(),
540        ));
541
542        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
543            log::error!("Failed to send instruments response: {e}");
544        }
545
546        Ok(())
547    }
548
549    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
550        log::debug!("Requesting instrument: {}", request.instrument_id);
551
552        let instrument = self.get_instrument(&request.instrument_id)?;
553
554        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
555            request.request_id,
556            request.client_id.unwrap_or(self.client_id),
557            instrument.id(),
558            instrument,
559            datetime_to_unix_nanos(request.start),
560            datetime_to_unix_nanos(request.end),
561            self.clock.get_time_ns(),
562            request.params.clone(),
563        )));
564
565        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
566            log::error!("Failed to send instrument response: {e}");
567        }
568
569        Ok(())
570    }
571
572    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
573        log::debug!("Requesting bars for {}", request.bar_type);
574
575        let http = self.http_client.clone();
576        let sender = self.data_sender.clone();
577        let bar_type = request.bar_type;
578        let start = request.start;
579        let end = request.end;
580        let limit = request.limit.map(|n| n.get() as u32);
581        let request_id = request.request_id;
582        let client_id = request.client_id.unwrap_or(self.client_id);
583        let params = request.params.clone();
584        let clock = self.clock;
585        let start_nanos = datetime_to_unix_nanos(start);
586        let end_nanos = datetime_to_unix_nanos(end);
587        let instruments = Arc::clone(&self.instruments);
588
589        get_runtime().spawn(async move {
590            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
591                Ok(bars) => {
592                    let response = DataResponse::Bars(BarsResponse::new(
593                        request_id,
594                        client_id,
595                        bar_type,
596                        bars,
597                        start_nanos,
598                        end_nanos,
599                        clock.get_time_ns(),
600                        params,
601                    ));
602                    if let Err(e) = sender.send(DataEvent::Response(response)) {
603                        log::error!("Failed to send bars response: {e}");
604                    }
605                }
606                Err(e) => log::error!("Bar request failed: {e:?}"),
607            }
608        });
609
610        Ok(())
611    }
612
613    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
614        log::debug!("Requesting trades for {}", request.instrument_id);
615
616        // NOTE: Hyperliquid does not provide public historical trade data via REST API
617        // - Real-time trades are available via WebSocket (subscribe_trades)
618        // - User fills (authenticated) are available via generate_fill_reports
619        // For now, return empty response similar to exchanges without public trade history
620        log::warn!(
621            "Historical trade data not available via REST on Hyperliquid for {}",
622            request.instrument_id
623        );
624
625        let trades = Vec::new();
626
627        let response = DataResponse::Trades(TradesResponse::new(
628            request.request_id,
629            request.client_id.unwrap_or(self.client_id),
630            request.instrument_id,
631            trades,
632            datetime_to_unix_nanos(request.start),
633            datetime_to_unix_nanos(request.end),
634            self.clock.get_time_ns(),
635            request.params.clone(),
636        ));
637
638        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
639            log::error!("Failed to send trades response: {e}");
640        }
641
642        Ok(())
643    }
644
645    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
646        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
647
648        let ws = self.ws_client.clone();
649        let instrument_id = subscription.instrument_id;
650
651        get_runtime().spawn(async move {
652            if let Err(e) = ws.subscribe_trades(instrument_id).await {
653                log::error!("Failed to subscribe to trades: {e:?}");
654            }
655        });
656
657        Ok(())
658    }
659
660    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
661        log::debug!(
662            "Unsubscribing from trades: {}",
663            unsubscription.instrument_id
664        );
665
666        let ws = self.ws_client.clone();
667        let instrument_id = unsubscription.instrument_id;
668
669        get_runtime().spawn(async move {
670            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
671                log::error!("Failed to unsubscribe from trades: {e:?}");
672            }
673        });
674
675        log::info!(
676            "Unsubscribed from trades for {}",
677            unsubscription.instrument_id
678        );
679
680        Ok(())
681    }
682
683    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
684        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
685
686        if subscription.book_type != BookType::L2_MBP {
687            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
688        }
689
690        let ws = self.ws_client.clone();
691        let instrument_id = subscription.instrument_id;
692
693        get_runtime().spawn(async move {
694            if let Err(e) = ws.subscribe_book(instrument_id).await {
695                log::error!("Failed to subscribe to book deltas: {e:?}");
696            }
697        });
698
699        Ok(())
700    }
701
702    fn unsubscribe_book_deltas(
703        &mut self,
704        unsubscription: &UnsubscribeBookDeltas,
705    ) -> anyhow::Result<()> {
706        log::debug!(
707            "Unsubscribing from book deltas: {}",
708            unsubscription.instrument_id
709        );
710
711        let ws = self.ws_client.clone();
712        let instrument_id = unsubscription.instrument_id;
713
714        get_runtime().spawn(async move {
715            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
716                log::error!("Failed to unsubscribe from book deltas: {e:?}");
717            }
718        });
719
720        Ok(())
721    }
722
723    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
724        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
725
726        let ws = self.ws_client.clone();
727        let instrument_id = subscription.instrument_id;
728
729        get_runtime().spawn(async move {
730            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
731                log::error!("Failed to subscribe to quotes: {e:?}");
732            }
733        });
734
735        Ok(())
736    }
737
738    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
739        log::debug!(
740            "Unsubscribing from quotes: {}",
741            unsubscription.instrument_id
742        );
743
744        let ws = self.ws_client.clone();
745        let instrument_id = unsubscription.instrument_id;
746
747        get_runtime().spawn(async move {
748            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
749                log::error!("Failed to unsubscribe from quotes: {e:?}");
750            }
751        });
752
753        log::info!(
754            "Unsubscribed from quotes for {}",
755            unsubscription.instrument_id
756        );
757
758        Ok(())
759    }
760
761    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
762        log::debug!("Subscribing to bars: {}", subscription.bar_type);
763
764        let instruments = self.instruments.read().unwrap();
765        let instrument_id = subscription.bar_type.instrument_id();
766        if !instruments.contains_key(&instrument_id) {
767            anyhow::bail!("Instrument {instrument_id} not found");
768        }
769
770        drop(instruments);
771
772        let bar_type = subscription.bar_type;
773        let ws = self.ws_client.clone();
774
775        get_runtime().spawn(async move {
776            if let Err(e) = ws.subscribe_bars(bar_type).await {
777                log::error!("Failed to subscribe to bars: {e:?}");
778            }
779        });
780
781        log::info!("Subscribed to bars for {}", subscription.bar_type);
782
783        Ok(())
784    }
785
786    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
787        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
788
789        let bar_type = unsubscription.bar_type;
790        let ws = self.ws_client.clone();
791
792        get_runtime().spawn(async move {
793            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
794                log::error!("Failed to unsubscribe from bars: {e:?}");
795            }
796        });
797
798        log::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
799
800        Ok(())
801    }
802}
803
804pub(crate) fn candle_to_bar(
805    candle: &HyperliquidCandle,
806    bar_type: BarType,
807    price_precision: u8,
808    size_precision: u8,
809) -> anyhow::Result<Bar> {
810    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
811    let ts_event = ts_init;
812
813    let open = candle.open.parse::<f64>().context("parse open price")?;
814    let high = candle.high.parse::<f64>().context("parse high price")?;
815    let low = candle.low.parse::<f64>().context("parse low price")?;
816    let close = candle.close.parse::<f64>().context("parse close price")?;
817    let volume = candle.volume.parse::<f64>().context("parse volume")?;
818
819    Ok(Bar::new(
820        bar_type,
821        Price::new(open, price_precision),
822        Price::new(high, price_precision),
823        Price::new(low, price_precision),
824        Price::new(close, price_precision),
825        Quantity::new(volume, size_precision),
826        ts_event,
827        ts_init,
828    ))
829}
830
831/// Request bars from HTTP API.
832async fn request_bars_from_http(
833    http_client: HyperliquidHttpClient,
834    bar_type: BarType,
835    start: Option<DateTime<Utc>>,
836    end: Option<DateTime<Utc>>,
837    limit: Option<u32>,
838    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
839) -> anyhow::Result<Vec<Bar>> {
840    // Get instrument details for precision
841    let instrument_id = bar_type.instrument_id();
842    let instrument = {
843        let guard = instruments.read().unwrap();
844        guard
845            .get(&instrument_id)
846            .cloned()
847            .context("instrument not found in cache")?
848    };
849
850    let price_precision = instrument.price_precision();
851    let size_precision = instrument.size_precision();
852
853    // Extract coin symbol from instrument ID (e.g., "BTC-PERP.HYPERLIQUID" -> "BTC")
854    let coin = instrument_id
855        .symbol
856        .as_str()
857        .split('-')
858        .next()
859        .context("invalid instrument symbol")?;
860
861    let interval = bar_type_to_interval(&bar_type)?;
862
863    // Hyperliquid uses millisecond timestamps
864    let now = Utc::now();
865    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
866    let start_time = if let Some(start) = start {
867        start.timestamp_millis() as u64
868    } else {
869        // Default to 1000 bars before end_time
870        let spec = bar_type.spec();
871        let step_ms = match spec.aggregation {
872            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
873            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
874            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
875            _ => 60_000,
876        };
877        end_time.saturating_sub(1000 * step_ms)
878    };
879
880    let candles = http_client
881        .info_candle_snapshot(coin, interval, start_time, end_time)
882        .await
883        .context("failed to fetch candle snapshot from Hyperliquid")?;
884
885    let mut bars: Vec<Bar> = candles
886        .iter()
887        .filter_map(|candle| {
888            candle_to_bar(candle, bar_type, price_precision, size_precision)
889                .map_err(|e| {
890                    log::warn!("Failed to convert candle to bar: {e}");
891                    e
892                })
893                .ok()
894        })
895        .collect();
896
897    if let Some(limit) = limit
898        && bars.len() > limit as usize
899    {
900        bars = bars.into_iter().take(limit as usize).collect();
901    }
902
903    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
904    Ok(bars)
905}