nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    messages::{
26        DataEvent,
27        data::{
28            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32            UnsubscribeQuotes, UnsubscribeTrades,
33        },
34    },
35    runner::get_data_event_sender,
36};
37use nautilus_core::{
38    UnixNanos,
39    time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43    data::{Bar, BarType, Data, OrderBookDeltas_API},
44    enums::BarAggregation,
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
55    config::HyperliquidDataClientConfig,
56    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57    websocket::{
58        client::HyperliquidWebSocketClient,
59        messages::{HyperliquidWsMessage, NautilusWsMessage},
60        parse::{
61            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
62        },
63    },
64};
65
66#[derive(Debug)]
67pub struct HyperliquidDataClient {
68    client_id: ClientId,
69    #[allow(dead_code)]
70    config: HyperliquidDataClientConfig,
71    http_client: HyperliquidHttpClient,
72    ws_client: HyperliquidWebSocketClient,
73    is_connected: AtomicBool,
74    cancellation_token: CancellationToken,
75    tasks: Vec<JoinHandle<()>>,
76    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78    /// Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
79    /// for efficient O(1) lookup in WebSocket message handlers
80    coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
81    clock: &'static AtomicTime,
82    #[allow(dead_code)]
83    instrument_refresh_active: bool,
84}
85
86impl HyperliquidDataClient {
87    /// Creates a new [`HyperliquidDataClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the HTTP client fails to initialize.
92    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
93        let clock = get_atomic_clock_realtime();
94        let data_sender = get_data_event_sender();
95
96        let http_client = if let Some(private_key_str) = &config.private_key {
97            let secrets = crate::common::credential::Secrets {
98                private_key: crate::common::credential::EvmPrivateKey::new(
99                    private_key_str.clone(),
100                )?,
101                is_testnet: config.is_testnet,
102                vault_address: None,
103            };
104            HyperliquidHttpClient::with_credentials(
105                &secrets,
106                config.http_timeout_secs,
107                config.http_proxy_url.clone(),
108            )?
109        } else {
110            HyperliquidHttpClient::new(
111                config.is_testnet,
112                config.http_timeout_secs,
113                config.http_proxy_url.clone(),
114            )?
115        };
116
117        // Note: Rust data client is not the primary interface; Python adapter is used instead.
118        // Defaulting to Perp for basic functionality.
119        let ws_client = HyperliquidWebSocketClient::new(
120            None,
121            config.is_testnet,
122            HyperliquidProductType::Perp,
123            None,
124        );
125
126        Ok(Self {
127            client_id,
128            config,
129            http_client,
130            ws_client,
131            is_connected: AtomicBool::new(false),
132            cancellation_token: CancellationToken::new(),
133            tasks: Vec::new(),
134            data_sender,
135            instruments: Arc::new(RwLock::new(AHashMap::new())),
136            coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
137            clock,
138            instrument_refresh_active: false,
139        })
140    }
141
142    fn venue(&self) -> Venue {
143        *HYPERLIQUID_VENUE
144    }
145
146    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
147        let instruments = self
148            .http_client
149            .request_instruments()
150            .await
151            .context("failed to fetch instruments during bootstrap")?;
152
153        let mut instruments_map = self.instruments.write().unwrap();
154        let mut coin_map = self.coin_to_instrument_id.write().unwrap();
155
156        for instrument in &instruments {
157            let instrument_id = instrument.id();
158            instruments_map.insert(instrument_id, instrument.clone());
159
160            // Build coin-to-instrument-id index for efficient WebSocket message lookup
161            // Use raw_symbol which contains Hyperliquid's coin ticker (e.g., "BTC")
162            let coin = instrument.raw_symbol().inner();
163            if instrument_id.symbol.as_str().starts_with("BTCUSD") {
164                log::warn!(
165                    "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
166                    instrument_id,
167                    instrument.raw_symbol(),
168                    coin
169                );
170            }
171            coin_map.insert(coin, instrument_id);
172
173            self.ws_client.cache_instrument(instrument.clone());
174        }
175
176        tracing::info!(
177            "Bootstrapped {} instruments with {} coin mappings",
178            instruments_map.len(),
179            coin_map.len()
180        );
181        Ok(instruments)
182    }
183
184    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
185        // Clone client before connecting so the clone can have out_rx set
186        let mut ws_client = self.ws_client.clone();
187
188        ws_client
189            .connect()
190            .await
191            .context("failed to connect to Hyperliquid WebSocket")?;
192
193        let _data_sender = self.data_sender.clone();
194        let _instruments = Arc::clone(&self.instruments);
195        let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
196        let _venue = self.venue();
197        let _clock = self.clock;
198        let cancellation_token = self.cancellation_token.clone();
199
200        let task = tokio::spawn(async move {
201            tracing::info!("Hyperliquid WebSocket consumption loop started");
202
203            loop {
204                tokio::select! {
205                    _ = cancellation_token.cancelled() => {
206                        tracing::info!("WebSocket consumption loop cancelled");
207                        break;
208                    }
209                    msg_opt = ws_client.next_event() => {
210                        if let Some(msg) = msg_opt {
211                            match msg {
212                                // Handled by python/websocket.rs
213                                NautilusWsMessage::Trades(_)
214                                | NautilusWsMessage::Quote(_)
215                                | NautilusWsMessage::Deltas(_)
216                                | NautilusWsMessage::Candle(_)
217                                | NautilusWsMessage::MarkPrice(_)
218                                | NautilusWsMessage::IndexPrice(_)
219                                | NautilusWsMessage::FundingRate(_) => {}
220                                NautilusWsMessage::Reconnected => {
221                                    tracing::info!("WebSocket reconnected");
222                                }
223                                NautilusWsMessage::Error(e) => {
224                                    tracing::error!("WebSocket error: {e}");
225                                }
226                                NautilusWsMessage::ExecutionReports(_) => {
227                                    // Handled by execution client
228                                }
229                            }
230                        } else {
231                            // Connection closed or error
232                            tracing::warn!("WebSocket next_event returned None, connection may be closed");
233                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
234                        }
235                    }
236                }
237            }
238
239            tracing::info!("Hyperliquid WebSocket consumption loop finished");
240        });
241
242        self.tasks.push(task);
243        tracing::info!("WebSocket consumption task spawned");
244
245        Ok(())
246    }
247
248    #[allow(dead_code)]
249    fn handle_ws_message(
250        msg: HyperliquidWsMessage,
251        ws_client: &HyperliquidWebSocketClient,
252        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
253        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
254        coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
255        _venue: Venue,
256        clock: &'static AtomicTime,
257    ) {
258        match msg {
259            HyperliquidWsMessage::Bbo { data } => {
260                let coin = data.coin;
261                tracing::debug!("Received BBO message for coin: {coin}");
262
263                // Use efficient O(1) lookup instead of iterating through all instruments
264                // Hyperliquid WebSocket sends coin="BTC", lookup returns "BTC-PERP" instrument ID
265                let coin_map = coin_to_instrument_id.read().unwrap();
266                let instrument_id = coin_map.get(&data.coin);
267
268                if let Some(&instrument_id) = instrument_id {
269                    let instruments_map = instruments.read().unwrap();
270                    if let Some(instrument) = instruments_map.get(&instrument_id) {
271                        let ts_init = clock.get_time_ns();
272
273                        match parse_ws_quote_tick(&data, instrument, ts_init) {
274                            Ok(quote_tick) => {
275                                tracing::debug!(
276                                    "Parsed quote tick for {}: bid={}, ask={}",
277                                    data.coin,
278                                    quote_tick.bid_price,
279                                    quote_tick.ask_price
280                                );
281                                if let Err(e) =
282                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
283                                {
284                                    tracing::error!("Failed to send quote tick: {e}");
285                                }
286                            }
287                            Err(e) => {
288                                tracing::error!(
289                                    "Failed to parse quote tick for {}: {e}",
290                                    data.coin
291                                );
292                            }
293                        }
294                    }
295                } else {
296                    tracing::warn!(
297                        "Received BBO for unknown coin: {} (no matching instrument found)",
298                        data.coin
299                    );
300                }
301            }
302            HyperliquidWsMessage::Trades { data } => {
303                let count = data.len();
304                tracing::debug!("Received {count} trade(s)");
305
306                // Process each trade in the batch
307                for trade_data in data {
308                    let coin = trade_data.coin;
309                    let coin_map = coin_to_instrument_id.read().unwrap();
310
311                    if let Some(&instrument_id) = coin_map.get(&coin) {
312                        let instruments_map = instruments.read().unwrap();
313                        if let Some(instrument) = instruments_map.get(&instrument_id) {
314                            let ts_init = clock.get_time_ns();
315
316                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
317                                Ok(trade_tick) => {
318                                    if let Err(e) =
319                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
320                                    {
321                                        tracing::error!("Failed to send trade tick: {e}");
322                                    }
323                                }
324                                Err(e) => {
325                                    tracing::error!("Failed to parse trade tick for {coin}: {e}");
326                                }
327                            }
328                        }
329                    } else {
330                        tracing::warn!("Received trade for unknown coin: {coin}");
331                    }
332                }
333            }
334            HyperliquidWsMessage::L2Book { data } => {
335                let coin = data.coin;
336                tracing::debug!("Received L2 book update for coin: {coin}");
337
338                let coin_map = coin_to_instrument_id.read().unwrap();
339                if let Some(&instrument_id) = coin_map.get(&data.coin) {
340                    let instruments_map = instruments.read().unwrap();
341                    if let Some(instrument) = instruments_map.get(&instrument_id) {
342                        let ts_init = clock.get_time_ns();
343
344                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
345                            Ok(deltas) => {
346                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
347                                    OrderBookDeltas_API::new(deltas),
348                                ))) {
349                                    tracing::error!("Failed to send order book deltas: {e}");
350                                }
351                            }
352                            Err(e) => {
353                                tracing::error!(
354                                    "Failed to parse order book deltas for {}: {e}",
355                                    data.coin
356                                );
357                            }
358                        }
359                    }
360                } else {
361                    tracing::warn!("Received L2 book for unknown coin: {coin}");
362                }
363            }
364            HyperliquidWsMessage::Candle { data } => {
365                let coin = &data.s;
366                let interval = &data.i;
367                tracing::debug!("Received candle for {coin}:{interval}");
368
369                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
370                    let coin = Ustr::from(&data.s);
371                    let coin_map = coin_to_instrument_id.read().unwrap();
372
373                    if let Some(&instrument_id) = coin_map.get(&coin) {
374                        let instruments_map = instruments.read().unwrap();
375                        if let Some(instrument) = instruments_map.get(&instrument_id) {
376                            let ts_init = clock.get_time_ns();
377
378                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
379                                Ok(bar) => {
380                                    if let Err(e) =
381                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
382                                    {
383                                        tracing::error!("Failed to send bar data: {e}");
384                                    }
385                                }
386                                Err(e) => {
387                                    tracing::error!("Failed to parse candle for {coin}: {e}");
388                                }
389                            }
390                        }
391                    } else {
392                        tracing::warn!("Received candle for unknown coin: {coin}");
393                    }
394                } else {
395                    tracing::debug!("Received candle for {coin}:{interval} but no BarType tracked");
396                }
397            }
398            _ => {
399                // Log other message types for debugging
400                tracing::trace!("Received unhandled WebSocket message: {:?}", msg);
401            }
402        }
403    }
404
405    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
406        let instruments = self.instruments.read().unwrap();
407        instruments
408            .get(instrument_id)
409            .cloned()
410            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
411    }
412}
413
414fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
415    value
416        .and_then(|dt| dt.timestamp_nanos_opt())
417        .and_then(|nanos| u64::try_from(nanos).ok())
418        .map(UnixNanos::from)
419}
420
421impl HyperliquidDataClient {
422    #[allow(dead_code)]
423    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
424        if let Err(e) = sender.send(DataEvent::Data(data)) {
425            tracing::error!("Failed to emit data event: {e}");
426        }
427    }
428}
429
430#[async_trait::async_trait]
431impl DataClient for HyperliquidDataClient {
432    fn client_id(&self) -> ClientId {
433        self.client_id
434    }
435
436    fn venue(&self) -> Option<Venue> {
437        Some(self.venue())
438    }
439
440    fn start(&mut self) -> anyhow::Result<()> {
441        tracing::info!(
442            client_id = %self.client_id,
443            is_testnet = self.config.is_testnet,
444            http_proxy_url = ?self.config.http_proxy_url,
445            ws_proxy_url = ?self.config.ws_proxy_url,
446            "Starting Hyperliquid data client"
447        );
448        Ok(())
449    }
450
451    fn stop(&mut self) -> anyhow::Result<()> {
452        tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
453        self.cancellation_token.cancel();
454        self.is_connected.store(false, Ordering::Relaxed);
455        Ok(())
456    }
457
458    fn reset(&mut self) -> anyhow::Result<()> {
459        tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
460        self.is_connected.store(false, Ordering::Relaxed);
461        self.cancellation_token = CancellationToken::new();
462        self.tasks.clear();
463        Ok(())
464    }
465
466    fn dispose(&mut self) -> anyhow::Result<()> {
467        tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
468        self.stop()
469    }
470
471    fn is_connected(&self) -> bool {
472        self.is_connected.load(Ordering::Acquire)
473    }
474
475    fn is_disconnected(&self) -> bool {
476        !self.is_connected()
477    }
478
479    async fn connect(&mut self) -> anyhow::Result<()> {
480        if self.is_connected() {
481            return Ok(());
482        }
483
484        // Bootstrap instruments from HTTP API
485        let _instruments = self
486            .bootstrap_instruments()
487            .await
488            .context("failed to bootstrap instruments")?;
489
490        // Connect WebSocket client
491        self.spawn_ws()
492            .await
493            .context("failed to spawn WebSocket client")?;
494
495        self.is_connected.store(true, Ordering::Relaxed);
496        tracing::info!(client_id = %self.client_id, "Connected");
497
498        Ok(())
499    }
500
501    async fn disconnect(&mut self) -> anyhow::Result<()> {
502        if !self.is_connected() {
503            return Ok(());
504        }
505
506        // Cancel all tasks
507        self.cancellation_token.cancel();
508
509        // Wait for all tasks to complete
510        for task in self.tasks.drain(..) {
511            if let Err(e) = task.await {
512                tracing::error!("Error waiting for task to complete: {e}");
513            }
514        }
515
516        // Disconnect WebSocket client
517        if let Err(e) = self.ws_client.disconnect().await {
518            tracing::error!("Error disconnecting WebSocket client: {e}");
519        }
520
521        // Clear state
522        {
523            let mut instruments = self.instruments.write().unwrap();
524            instruments.clear();
525        }
526
527        self.is_connected.store(false, Ordering::Relaxed);
528        tracing::info!(client_id = %self.client_id, "Disconnected");
529
530        Ok(())
531    }
532
533    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
534        tracing::debug!("Requesting all instruments");
535
536        let instruments = {
537            let instruments_map = self.instruments.read().unwrap();
538            instruments_map.values().cloned().collect()
539        };
540
541        let response = DataResponse::Instruments(InstrumentsResponse::new(
542            request.request_id,
543            request.client_id.unwrap_or(self.client_id),
544            self.venue(),
545            instruments,
546            datetime_to_unix_nanos(request.start),
547            datetime_to_unix_nanos(request.end),
548            self.clock.get_time_ns(),
549            request.params.clone(),
550        ));
551
552        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
553            tracing::error!("Failed to send instruments response: {e}");
554        }
555
556        Ok(())
557    }
558
559    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
560        tracing::debug!("Requesting instrument: {}", request.instrument_id);
561
562        let instrument = self.get_instrument(&request.instrument_id)?;
563
564        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
565            request.request_id,
566            request.client_id.unwrap_or(self.client_id),
567            instrument.id(),
568            instrument,
569            datetime_to_unix_nanos(request.start),
570            datetime_to_unix_nanos(request.end),
571            self.clock.get_time_ns(),
572            request.params.clone(),
573        )));
574
575        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
576            tracing::error!("Failed to send instrument response: {e}");
577        }
578
579        Ok(())
580    }
581
582    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
583        tracing::debug!("Requesting bars for {}", request.bar_type);
584
585        let http = self.http_client.clone();
586        let sender = self.data_sender.clone();
587        let bar_type = request.bar_type;
588        let start = request.start;
589        let end = request.end;
590        let limit = request.limit.map(|n| n.get() as u32);
591        let request_id = request.request_id;
592        let client_id = request.client_id.unwrap_or(self.client_id);
593        let params = request.params.clone();
594        let clock = self.clock;
595        let start_nanos = datetime_to_unix_nanos(start);
596        let end_nanos = datetime_to_unix_nanos(end);
597        let instruments = Arc::clone(&self.instruments);
598
599        tokio::spawn(async move {
600            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
601                Ok(bars) => {
602                    let response = DataResponse::Bars(BarsResponse::new(
603                        request_id,
604                        client_id,
605                        bar_type,
606                        bars,
607                        start_nanos,
608                        end_nanos,
609                        clock.get_time_ns(),
610                        params,
611                    ));
612                    if let Err(e) = sender.send(DataEvent::Response(response)) {
613                        tracing::error!("Failed to send bars response: {e}");
614                    }
615                }
616                Err(e) => tracing::error!("Bar request failed: {e:?}"),
617            }
618        });
619
620        Ok(())
621    }
622
623    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
624        tracing::debug!("Requesting trades for {}", request.instrument_id);
625
626        // NOTE: Hyperliquid does not provide public historical trade data via REST API
627        // - Real-time trades are available via WebSocket (subscribe_trades)
628        // - User fills (authenticated) are available via generate_fill_reports
629        // For now, return empty response similar to exchanges without public trade history
630        tracing::warn!(
631            "Historical trade data not available via REST on Hyperliquid for {}",
632            request.instrument_id
633        );
634
635        let trades = Vec::new();
636
637        let response = DataResponse::Trades(TradesResponse::new(
638            request.request_id,
639            request.client_id.unwrap_or(self.client_id),
640            request.instrument_id,
641            trades,
642            datetime_to_unix_nanos(request.start),
643            datetime_to_unix_nanos(request.end),
644            self.clock.get_time_ns(),
645            request.params.clone(),
646        ));
647
648        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
649            tracing::error!("Failed to send trades response: {e}");
650        }
651
652        Ok(())
653    }
654
655    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
656        tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
657
658        let ws = self.ws_client.clone();
659        let instrument_id = subscription.instrument_id;
660
661        tokio::spawn(async move {
662            if let Err(e) = ws.subscribe_trades(instrument_id).await {
663                tracing::error!("Failed to subscribe to trades: {e:?}");
664            }
665        });
666
667        Ok(())
668    }
669
670    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
671        tracing::debug!(
672            "Unsubscribing from trades: {}",
673            unsubscription.instrument_id
674        );
675
676        let ws = self.ws_client.clone();
677        let instrument_id = unsubscription.instrument_id;
678
679        tokio::spawn(async move {
680            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
681                tracing::error!("Failed to unsubscribe from trades: {e:?}");
682            }
683        });
684
685        tracing::info!(
686            "Unsubscribed from trades for {}",
687            unsubscription.instrument_id
688        );
689
690        Ok(())
691    }
692
693    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
694        tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
695
696        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
697            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
698        }
699
700        let ws = self.ws_client.clone();
701        let instrument_id = subscription.instrument_id;
702
703        tokio::spawn(async move {
704            if let Err(e) = ws.subscribe_book(instrument_id).await {
705                tracing::error!("Failed to subscribe to book deltas: {e:?}");
706            }
707        });
708
709        Ok(())
710    }
711
712    fn unsubscribe_book_deltas(
713        &mut self,
714        unsubscription: &UnsubscribeBookDeltas,
715    ) -> anyhow::Result<()> {
716        tracing::debug!(
717            "Unsubscribing from book deltas: {}",
718            unsubscription.instrument_id
719        );
720
721        let ws = self.ws_client.clone();
722        let instrument_id = unsubscription.instrument_id;
723
724        tokio::spawn(async move {
725            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
726                tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
727            }
728        });
729
730        Ok(())
731    }
732
733    fn subscribe_book_snapshots(
734        &mut self,
735        subscription: &SubscribeBookSnapshots,
736    ) -> anyhow::Result<()> {
737        tracing::debug!(
738            "Subscribing to book snapshots: {}",
739            subscription.instrument_id
740        );
741
742        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
743            anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
744        }
745
746        let ws = self.ws_client.clone();
747        let instrument_id = subscription.instrument_id;
748
749        tokio::spawn(async move {
750            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
751                tracing::error!("Failed to subscribe to book snapshots: {e:?}");
752            }
753        });
754
755        Ok(())
756    }
757
758    fn unsubscribe_book_snapshots(
759        &mut self,
760        unsubscription: &UnsubscribeBookSnapshots,
761    ) -> anyhow::Result<()> {
762        tracing::debug!(
763            "Unsubscribing from book snapshots: {}",
764            unsubscription.instrument_id
765        );
766
767        let ws = self.ws_client.clone();
768        let instrument_id = unsubscription.instrument_id;
769
770        tokio::spawn(async move {
771            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
772                tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
773            }
774        });
775
776        Ok(())
777    }
778
779    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
780        tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
781
782        let ws = self.ws_client.clone();
783        let instrument_id = subscription.instrument_id;
784
785        tokio::spawn(async move {
786            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
787                tracing::error!("Failed to subscribe to quotes: {e:?}");
788            }
789        });
790
791        Ok(())
792    }
793
794    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
795        tracing::debug!(
796            "Unsubscribing from quotes: {}",
797            unsubscription.instrument_id
798        );
799
800        let ws = self.ws_client.clone();
801        let instrument_id = unsubscription.instrument_id;
802
803        tokio::spawn(async move {
804            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
805                tracing::error!("Failed to unsubscribe from quotes: {e:?}");
806            }
807        });
808
809        tracing::info!(
810            "Unsubscribed from quotes for {}",
811            unsubscription.instrument_id
812        );
813
814        Ok(())
815    }
816
817    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
818        tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
819
820        let instruments = self.instruments.read().unwrap();
821        let instrument_id = subscription.bar_type.instrument_id();
822        if !instruments.contains_key(&instrument_id) {
823            anyhow::bail!("Instrument {} not found", instrument_id);
824        }
825
826        drop(instruments);
827
828        let bar_type = subscription.bar_type;
829        let ws = self.ws_client.clone();
830
831        tokio::spawn(async move {
832            if let Err(e) = ws.subscribe_bars(bar_type).await {
833                tracing::error!("Failed to subscribe to bars: {e:?}");
834            }
835        });
836
837        tracing::info!("Subscribed to bars for {}", subscription.bar_type);
838
839        Ok(())
840    }
841
842    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
843        tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
844
845        let bar_type = unsubscription.bar_type;
846        let ws = self.ws_client.clone();
847
848        tokio::spawn(async move {
849            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
850                tracing::error!("Failed to unsubscribe from bars: {e:?}");
851            }
852        });
853
854        tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
855
856        Ok(())
857    }
858}
859
860pub(crate) fn candle_to_bar(
861    candle: &HyperliquidCandle,
862    bar_type: BarType,
863    price_precision: u8,
864    size_precision: u8,
865) -> anyhow::Result<Bar> {
866    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
867    let ts_event = ts_init;
868
869    let open = candle.open.parse::<f64>().context("parse open price")?;
870    let high = candle.high.parse::<f64>().context("parse high price")?;
871    let low = candle.low.parse::<f64>().context("parse low price")?;
872    let close = candle.close.parse::<f64>().context("parse close price")?;
873    let volume = candle.volume.parse::<f64>().context("parse volume")?;
874
875    Ok(Bar::new(
876        bar_type,
877        Price::new(open, price_precision),
878        Price::new(high, price_precision),
879        Price::new(low, price_precision),
880        Price::new(close, price_precision),
881        Quantity::new(volume, size_precision),
882        ts_event,
883        ts_init,
884    ))
885}
886
887/// Request bars from HTTP API.
888async fn request_bars_from_http(
889    http_client: HyperliquidHttpClient,
890    bar_type: BarType,
891    start: Option<DateTime<Utc>>,
892    end: Option<DateTime<Utc>>,
893    limit: Option<u32>,
894    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
895) -> anyhow::Result<Vec<Bar>> {
896    // Get instrument details for precision
897    let instrument_id = bar_type.instrument_id();
898    let instrument = {
899        let guard = instruments.read().unwrap();
900        guard
901            .get(&instrument_id)
902            .cloned()
903            .context("instrument not found in cache")?
904    };
905
906    let price_precision = instrument.price_precision();
907    let size_precision = instrument.size_precision();
908
909    // Extract coin symbol from instrument ID (e.g., "BTC-PERP.HYPERLIQUID" -> "BTC")
910    let coin = instrument_id
911        .symbol
912        .as_str()
913        .split('-')
914        .next()
915        .context("invalid instrument symbol")?;
916
917    let interval = bar_type_to_interval(&bar_type)?;
918
919    // Hyperliquid uses millisecond timestamps
920    let now = Utc::now();
921    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
922    let start_time = if let Some(start) = start {
923        start.timestamp_millis() as u64
924    } else {
925        // Default to 1000 bars before end_time
926        let spec = bar_type.spec();
927        let step_ms = match spec.aggregation {
928            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
929            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
930            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
931            _ => 60_000,
932        };
933        end_time.saturating_sub(1000 * step_ms)
934    };
935
936    let candles = http_client
937        .info_candle_snapshot(coin, interval, start_time, end_time)
938        .await
939        .context("failed to fetch candle snapshot from Hyperliquid")?;
940
941    let mut bars: Vec<Bar> = candles
942        .iter()
943        .filter_map(|candle| {
944            candle_to_bar(candle, bar_type, price_precision, size_precision)
945                .map_err(|e| {
946                    tracing::warn!("Failed to convert candle to bar: {e}");
947                    e
948                })
949                .ok()
950        })
951        .collect();
952
953    if let Some(limit) = limit
954        && bars.len() > limit as usize
955    {
956        bars = bars.into_iter().take(limit as usize).collect();
957    }
958
959    tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
960    Ok(bars)
961}