Skip to main content

nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    clients::DataClient,
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent,
29        data::{
30            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
31            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
32            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
33            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
34            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
35            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38};
39use nautilus_core::{
40    UnixNanos,
41    datetime::datetime_to_unix_nanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45    data::{Bar, BarType, Data, OrderBookDeltas_API},
46    enums::{BarAggregation, BookType},
47    identifiers::{ClientId, InstrumentId, Venue},
48    instruments::{Instrument, InstrumentAny},
49    types::{Price, Quantity},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53use ustr::Ustr;
54
55use crate::{
56    common::{consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
57    config::HyperliquidDataClientConfig,
58    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
59    websocket::{
60        client::HyperliquidWebSocketClient,
61        messages::{HyperliquidWsMessage, NautilusWsMessage},
62        parse::{
63            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
64        },
65    },
66};
67
68#[derive(Debug)]
69pub struct HyperliquidDataClient {
70    client_id: ClientId,
71    #[allow(dead_code)]
72    config: HyperliquidDataClientConfig,
73    http_client: HyperliquidHttpClient,
74    ws_client: HyperliquidWebSocketClient,
75    is_connected: AtomicBool,
76    cancellation_token: CancellationToken,
77    tasks: Vec<JoinHandle<()>>,
78    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
79    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
80    /// Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
81    /// for efficient O(1) lookup in WebSocket message handlers
82    coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
83    clock: &'static AtomicTime,
84    #[allow(dead_code)]
85    instrument_refresh_active: bool,
86}
87
88impl HyperliquidDataClient {
89    /// Creates a new [`HyperliquidDataClient`] instance.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the HTTP client fails to initialize.
94    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
95        let clock = get_atomic_clock_realtime();
96        let data_sender = get_data_event_sender();
97
98        let http_client = if let Some(private_key_str) = &config.private_key {
99            let secrets = crate::common::credential::Secrets {
100                private_key: crate::common::credential::EvmPrivateKey::new(
101                    private_key_str.clone(),
102                )?,
103                is_testnet: config.is_testnet,
104                vault_address: None,
105            };
106            HyperliquidHttpClient::with_secrets(
107                &secrets,
108                config.http_timeout_secs,
109                config.http_proxy_url.clone(),
110            )?
111        } else {
112            HyperliquidHttpClient::new(
113                config.is_testnet,
114                config.http_timeout_secs,
115                config.http_proxy_url.clone(),
116            )?
117        };
118
119        // Note: Rust data client is not the primary interface; Python adapter is used instead.
120        let ws_client = HyperliquidWebSocketClient::new(None, config.is_testnet, None);
121
122        Ok(Self {
123            client_id,
124            config,
125            http_client,
126            ws_client,
127            is_connected: AtomicBool::new(false),
128            cancellation_token: CancellationToken::new(),
129            tasks: Vec::new(),
130            data_sender,
131            instruments: Arc::new(RwLock::new(AHashMap::new())),
132            coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
133            clock,
134            instrument_refresh_active: false,
135        })
136    }
137
138    fn venue(&self) -> Venue {
139        *HYPERLIQUID_VENUE
140    }
141
142    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
143        let instruments = self
144            .http_client
145            .request_instruments()
146            .await
147            .context("failed to fetch instruments during bootstrap")?;
148
149        let mut instruments_map = self.instruments.write().unwrap();
150        let mut coin_map = self.coin_to_instrument_id.write().unwrap();
151
152        for instrument in &instruments {
153            let instrument_id = instrument.id();
154            instruments_map.insert(instrument_id, instrument.clone());
155
156            // Build coin-to-instrument-id index for efficient WebSocket message lookup
157            // Use raw_symbol which contains Hyperliquid's coin ticker (e.g., "BTC")
158            let coin = instrument.raw_symbol().inner();
159            if instrument_id.symbol.as_str().starts_with("BTCUSD") {
160                log::warn!(
161                    "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
162                    instrument_id,
163                    instrument.raw_symbol(),
164                    coin
165                );
166            }
167            coin_map.insert(coin, instrument_id);
168
169            self.ws_client.cache_instrument(instrument.clone());
170        }
171
172        log::info!(
173            "Bootstrapped {} instruments with {} coin mappings",
174            instruments_map.len(),
175            coin_map.len()
176        );
177        Ok(instruments)
178    }
179
180    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
181        // Clone client before connecting so the clone can have out_rx set
182        let mut ws_client = self.ws_client.clone();
183
184        ws_client
185            .connect()
186            .await
187            .context("failed to connect to Hyperliquid WebSocket")?;
188
189        let data_sender = self.data_sender.clone();
190        let cancellation_token = self.cancellation_token.clone();
191
192        let task = get_runtime().spawn(async move {
193            log::info!("Hyperliquid WebSocket consumption loop started");
194
195            loop {
196                tokio::select! {
197                    () = cancellation_token.cancelled() => {
198                        log::info!("WebSocket consumption loop cancelled");
199                        break;
200                    }
201                    msg_opt = ws_client.next_event() => {
202                        if let Some(msg) = msg_opt {
203                            match msg {
204                                NautilusWsMessage::Trades(trades) => {
205                                    for trade in trades {
206                                        if let Err(e) = data_sender
207                                            .send(DataEvent::Data(Data::Trade(trade)))
208                                        {
209                                            log::error!("Failed to send trade tick: {e}");
210                                        }
211                                    }
212                                }
213                                NautilusWsMessage::Quote(quote) => {
214                                    if let Err(e) = data_sender
215                                        .send(DataEvent::Data(Data::Quote(quote)))
216                                    {
217                                        log::error!("Failed to send quote tick: {e}");
218                                    }
219                                }
220                                NautilusWsMessage::Deltas(deltas) => {
221                                    if let Err(e) = data_sender
222                                        .send(DataEvent::Data(Data::Deltas(
223                                            OrderBookDeltas_API::new(deltas),
224                                        )))
225                                    {
226                                        log::error!("Failed to send order book deltas: {e}");
227                                    }
228                                }
229                                NautilusWsMessage::Candle(bar) => {
230                                    if let Err(e) = data_sender
231                                        .send(DataEvent::Data(Data::Bar(bar)))
232                                    {
233                                        log::error!("Failed to send bar: {e}");
234                                    }
235                                }
236                                NautilusWsMessage::MarkPrice(update) => {
237                                    if let Err(e) = data_sender
238                                        .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
239                                    {
240                                        log::error!("Failed to send mark price update: {e}");
241                                    }
242                                }
243                                NautilusWsMessage::IndexPrice(update) => {
244                                    if let Err(e) = data_sender
245                                        .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
246                                    {
247                                        log::error!("Failed to send index price update: {e}");
248                                    }
249                                }
250                                NautilusWsMessage::FundingRate(update) => {
251                                    if let Err(e) = data_sender
252                                        .send(DataEvent::FundingRate(update))
253                                    {
254                                        log::error!("Failed to send funding rate update: {e}");
255                                    }
256                                }
257                                NautilusWsMessage::Reconnected => {
258                                    log::info!("WebSocket reconnected");
259                                }
260                                NautilusWsMessage::Error(e) => {
261                                    log::error!("WebSocket error: {e}");
262                                }
263                                NautilusWsMessage::ExecutionReports(_) => {
264                                    // Handled by execution client
265                                }
266                            }
267                        } else {
268                            // Connection closed or error
269                            log::warn!("WebSocket next_event returned None, connection may be closed");
270                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
271                        }
272                    }
273                }
274            }
275
276            log::info!("Hyperliquid WebSocket consumption loop finished");
277        });
278
279        self.tasks.push(task);
280        log::info!("WebSocket consumption task spawned");
281
282        Ok(())
283    }
284
285    #[allow(dead_code)]
286    fn handle_ws_message(
287        msg: HyperliquidWsMessage,
288        ws_client: &HyperliquidWebSocketClient,
289        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
290        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
291        coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
292        _venue: Venue,
293        clock: &'static AtomicTime,
294    ) {
295        match msg {
296            HyperliquidWsMessage::Bbo { data } => {
297                let coin = data.coin;
298                log::debug!("Received BBO message for coin: {coin}");
299
300                // Use efficient O(1) lookup instead of iterating through all instruments
301                // Hyperliquid WebSocket sends coin="BTC", lookup returns "BTC-PERP" instrument ID
302                let coin_map = coin_to_instrument_id.read().unwrap();
303                let instrument_id = coin_map.get(&data.coin);
304
305                if let Some(&instrument_id) = instrument_id {
306                    let instruments_map = instruments.read().unwrap();
307                    if let Some(instrument) = instruments_map.get(&instrument_id) {
308                        let ts_init = clock.get_time_ns();
309
310                        match parse_ws_quote_tick(&data, instrument, ts_init) {
311                            Ok(quote_tick) => {
312                                log::debug!(
313                                    "Parsed quote tick for {}: bid={}, ask={}",
314                                    data.coin,
315                                    quote_tick.bid_price,
316                                    quote_tick.ask_price
317                                );
318                                if let Err(e) =
319                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
320                                {
321                                    log::error!("Failed to send quote tick: {e}");
322                                }
323                            }
324                            Err(e) => {
325                                log::error!("Failed to parse quote tick for {}: {e}", data.coin);
326                            }
327                        }
328                    }
329                } else {
330                    log::warn!(
331                        "Received BBO for unknown coin: {} (no matching instrument found)",
332                        data.coin
333                    );
334                }
335            }
336            HyperliquidWsMessage::Trades { data } => {
337                let count = data.len();
338                log::debug!("Received {count} trade(s)");
339
340                // Process each trade in the batch
341                for trade_data in data {
342                    let coin = trade_data.coin;
343                    let coin_map = coin_to_instrument_id.read().unwrap();
344
345                    if let Some(&instrument_id) = coin_map.get(&coin) {
346                        let instruments_map = instruments.read().unwrap();
347                        if let Some(instrument) = instruments_map.get(&instrument_id) {
348                            let ts_init = clock.get_time_ns();
349
350                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
351                                Ok(trade_tick) => {
352                                    if let Err(e) =
353                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
354                                    {
355                                        log::error!("Failed to send trade tick: {e}");
356                                    }
357                                }
358                                Err(e) => {
359                                    log::error!("Failed to parse trade tick for {coin}: {e}");
360                                }
361                            }
362                        }
363                    } else {
364                        log::warn!("Received trade for unknown coin: {coin}");
365                    }
366                }
367            }
368            HyperliquidWsMessage::L2Book { data } => {
369                let coin = data.coin;
370                log::debug!("Received L2 book update for coin: {coin}");
371
372                let coin_map = coin_to_instrument_id.read().unwrap();
373                if let Some(&instrument_id) = coin_map.get(&data.coin) {
374                    let instruments_map = instruments.read().unwrap();
375                    if let Some(instrument) = instruments_map.get(&instrument_id) {
376                        let ts_init = clock.get_time_ns();
377
378                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
379                            Ok(deltas) => {
380                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
381                                    OrderBookDeltas_API::new(deltas),
382                                ))) {
383                                    log::error!("Failed to send order book deltas: {e}");
384                                }
385                            }
386                            Err(e) => {
387                                log::error!(
388                                    "Failed to parse order book deltas for {}: {e}",
389                                    data.coin
390                                );
391                            }
392                        }
393                    }
394                } else {
395                    log::warn!("Received L2 book for unknown coin: {coin}");
396                }
397            }
398            HyperliquidWsMessage::Candle { data } => {
399                let coin = &data.s;
400                let interval = &data.i;
401                log::debug!("Received candle for {coin}:{interval}");
402
403                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
404                    let coin = Ustr::from(&data.s);
405                    let coin_map = coin_to_instrument_id.read().unwrap();
406
407                    if let Some(&instrument_id) = coin_map.get(&coin) {
408                        let instruments_map = instruments.read().unwrap();
409                        if let Some(instrument) = instruments_map.get(&instrument_id) {
410                            let ts_init = clock.get_time_ns();
411
412                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
413                                Ok(bar) => {
414                                    if let Err(e) =
415                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
416                                    {
417                                        log::error!("Failed to send bar data: {e}");
418                                    }
419                                }
420                                Err(e) => {
421                                    log::error!("Failed to parse candle for {coin}: {e}");
422                                }
423                            }
424                        }
425                    } else {
426                        log::warn!("Received candle for unknown coin: {coin}");
427                    }
428                } else {
429                    log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
430                }
431            }
432            _ => {
433                // Log other message types for debugging
434                log::trace!("Received unhandled WebSocket message: {msg:?}");
435            }
436        }
437    }
438
439    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
440        let instruments = self.instruments.read().unwrap();
441        instruments
442            .get(instrument_id)
443            .cloned()
444            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
445    }
446}
447
448impl HyperliquidDataClient {
449    #[allow(dead_code)]
450    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
451        if let Err(e) = sender.send(DataEvent::Data(data)) {
452            log::error!("Failed to emit data event: {e}");
453        }
454    }
455}
456
457#[async_trait::async_trait(?Send)]
458impl DataClient for HyperliquidDataClient {
459    fn client_id(&self) -> ClientId {
460        self.client_id
461    }
462
463    fn venue(&self) -> Option<Venue> {
464        Some(self.venue())
465    }
466
467    fn start(&mut self) -> anyhow::Result<()> {
468        log::info!(
469            "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
470            self.client_id,
471            self.config.is_testnet,
472            self.config.http_proxy_url,
473            self.config.ws_proxy_url,
474        );
475        Ok(())
476    }
477
478    fn stop(&mut self) -> anyhow::Result<()> {
479        log::info!("Stopping Hyperliquid data client {}", self.client_id);
480        self.cancellation_token.cancel();
481        self.is_connected.store(false, Ordering::Relaxed);
482        Ok(())
483    }
484
485    fn reset(&mut self) -> anyhow::Result<()> {
486        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
487        self.is_connected.store(false, Ordering::Relaxed);
488        self.cancellation_token = CancellationToken::new();
489        self.tasks.clear();
490        Ok(())
491    }
492
493    fn dispose(&mut self) -> anyhow::Result<()> {
494        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
495        self.stop()
496    }
497
498    fn is_connected(&self) -> bool {
499        self.is_connected.load(Ordering::Acquire)
500    }
501
502    fn is_disconnected(&self) -> bool {
503        !self.is_connected()
504    }
505
506    async fn connect(&mut self) -> anyhow::Result<()> {
507        if self.is_connected() {
508            return Ok(());
509        }
510
511        // Bootstrap instruments from HTTP API
512        let _instruments = self
513            .bootstrap_instruments()
514            .await
515            .context("failed to bootstrap instruments")?;
516
517        // Connect WebSocket client
518        self.spawn_ws()
519            .await
520            .context("failed to spawn WebSocket client")?;
521
522        self.is_connected.store(true, Ordering::Relaxed);
523        log::info!("Connected: client_id={}", self.client_id);
524
525        Ok(())
526    }
527
528    async fn disconnect(&mut self) -> anyhow::Result<()> {
529        if !self.is_connected() {
530            return Ok(());
531        }
532
533        // Cancel all tasks
534        self.cancellation_token.cancel();
535
536        // Wait for all tasks to complete
537        for task in self.tasks.drain(..) {
538            if let Err(e) = task.await {
539                log::error!("Error waiting for task to complete: {e}");
540            }
541        }
542
543        // Disconnect WebSocket client
544        if let Err(e) = self.ws_client.disconnect().await {
545            log::error!("Error disconnecting WebSocket client: {e}");
546        }
547
548        // Clear state
549        {
550            let mut instruments = self.instruments.write().unwrap();
551            instruments.clear();
552        }
553
554        self.is_connected.store(false, Ordering::Relaxed);
555        log::info!("Disconnected: client_id={}", self.client_id);
556
557        Ok(())
558    }
559
560    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
561        log::debug!("Requesting all instruments");
562
563        let instruments = {
564            let instruments_map = self.instruments.read().unwrap();
565            instruments_map.values().cloned().collect()
566        };
567
568        let response = DataResponse::Instruments(InstrumentsResponse::new(
569            request.request_id,
570            request.client_id.unwrap_or(self.client_id),
571            self.venue(),
572            instruments,
573            datetime_to_unix_nanos(request.start),
574            datetime_to_unix_nanos(request.end),
575            self.clock.get_time_ns(),
576            request.params,
577        ));
578
579        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
580            log::error!("Failed to send instruments response: {e}");
581        }
582
583        Ok(())
584    }
585
586    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
587        log::debug!("Requesting instrument: {}", request.instrument_id);
588
589        let instrument = self.get_instrument(&request.instrument_id)?;
590
591        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
592            request.request_id,
593            request.client_id.unwrap_or(self.client_id),
594            instrument.id(),
595            instrument,
596            datetime_to_unix_nanos(request.start),
597            datetime_to_unix_nanos(request.end),
598            self.clock.get_time_ns(),
599            request.params,
600        )));
601
602        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
603            log::error!("Failed to send instrument response: {e}");
604        }
605
606        Ok(())
607    }
608
609    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
610        log::debug!("Requesting bars for {}", request.bar_type);
611
612        let http = self.http_client.clone();
613        let sender = self.data_sender.clone();
614        let bar_type = request.bar_type;
615        let start = request.start;
616        let end = request.end;
617        let limit = request.limit.map(|n| n.get() as u32);
618        let request_id = request.request_id;
619        let client_id = request.client_id.unwrap_or(self.client_id);
620        let params = request.params;
621        let clock = self.clock;
622        let start_nanos = datetime_to_unix_nanos(start);
623        let end_nanos = datetime_to_unix_nanos(end);
624        let instruments = Arc::clone(&self.instruments);
625
626        get_runtime().spawn(async move {
627            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
628                Ok(bars) => {
629                    let response = DataResponse::Bars(BarsResponse::new(
630                        request_id,
631                        client_id,
632                        bar_type,
633                        bars,
634                        start_nanos,
635                        end_nanos,
636                        clock.get_time_ns(),
637                        params,
638                    ));
639                    if let Err(e) = sender.send(DataEvent::Response(response)) {
640                        log::error!("Failed to send bars response: {e}");
641                    }
642                }
643                Err(e) => log::error!("Bar request failed: {e:?}"),
644            }
645        });
646
647        Ok(())
648    }
649
650    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
651        log::debug!("Requesting trades for {}", request.instrument_id);
652
653        // NOTE: Hyperliquid does not provide public historical trade data via REST API
654        // - Real-time trades are available via WebSocket (subscribe_trades)
655        // - User fills (authenticated) are available via generate_fill_reports
656        // For now, return empty response similar to exchanges without public trade history
657        log::warn!(
658            "Historical trade data not available via REST on Hyperliquid for {}",
659            request.instrument_id
660        );
661
662        let trades = Vec::new();
663
664        let response = DataResponse::Trades(TradesResponse::new(
665            request.request_id,
666            request.client_id.unwrap_or(self.client_id),
667            request.instrument_id,
668            trades,
669            datetime_to_unix_nanos(request.start),
670            datetime_to_unix_nanos(request.end),
671            self.clock.get_time_ns(),
672            request.params,
673        ));
674
675        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
676            log::error!("Failed to send trades response: {e}");
677        }
678
679        Ok(())
680    }
681
682    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
683        let instruments = self.instruments.read().unwrap();
684        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
685            if let Err(e) = self
686                .data_sender
687                .send(DataEvent::Instrument(instrument.clone()))
688            {
689                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
690            }
691        } else {
692            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
693        }
694        Ok(())
695    }
696
697    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
698        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
699
700        let ws = self.ws_client.clone();
701        let instrument_id = subscription.instrument_id;
702
703        get_runtime().spawn(async move {
704            if let Err(e) = ws.subscribe_trades(instrument_id).await {
705                log::error!("Failed to subscribe to trades: {e:?}");
706            }
707        });
708
709        Ok(())
710    }
711
712    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
713        log::debug!(
714            "Unsubscribing from trades: {}",
715            unsubscription.instrument_id
716        );
717
718        let ws = self.ws_client.clone();
719        let instrument_id = unsubscription.instrument_id;
720
721        get_runtime().spawn(async move {
722            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
723                log::error!("Failed to unsubscribe from trades: {e:?}");
724            }
725        });
726
727        Ok(())
728    }
729
730    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
731        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
732
733        if subscription.book_type != BookType::L2_MBP {
734            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
735        }
736
737        let ws = self.ws_client.clone();
738        let instrument_id = subscription.instrument_id;
739
740        get_runtime().spawn(async move {
741            if let Err(e) = ws.subscribe_book(instrument_id).await {
742                log::error!("Failed to subscribe to book deltas: {e:?}");
743            }
744        });
745
746        Ok(())
747    }
748
749    fn unsubscribe_book_deltas(
750        &mut self,
751        unsubscription: &UnsubscribeBookDeltas,
752    ) -> anyhow::Result<()> {
753        log::debug!(
754            "Unsubscribing from book deltas: {}",
755            unsubscription.instrument_id
756        );
757
758        let ws = self.ws_client.clone();
759        let instrument_id = unsubscription.instrument_id;
760
761        get_runtime().spawn(async move {
762            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
763                log::error!("Failed to unsubscribe from book deltas: {e:?}");
764            }
765        });
766
767        Ok(())
768    }
769
770    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
771        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
772
773        let ws = self.ws_client.clone();
774        let instrument_id = subscription.instrument_id;
775
776        get_runtime().spawn(async move {
777            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
778                log::error!("Failed to subscribe to quotes: {e:?}");
779            }
780        });
781
782        Ok(())
783    }
784
785    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
786        log::debug!(
787            "Unsubscribing from quotes: {}",
788            unsubscription.instrument_id
789        );
790
791        let ws = self.ws_client.clone();
792        let instrument_id = unsubscription.instrument_id;
793
794        get_runtime().spawn(async move {
795            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
796                log::error!("Failed to unsubscribe from quotes: {e:?}");
797            }
798        });
799
800        Ok(())
801    }
802
803    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
804        let ws = self.ws_client.clone();
805        let instrument_id = cmd.instrument_id;
806
807        get_runtime().spawn(async move {
808            if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
809                log::error!("Failed to subscribe to mark prices: {e:?}");
810            }
811        });
812
813        Ok(())
814    }
815
816    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
817        let ws = self.ws_client.clone();
818        let instrument_id = cmd.instrument_id;
819
820        get_runtime().spawn(async move {
821            if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
822                log::error!("Failed to unsubscribe from mark prices: {e:?}");
823            }
824        });
825
826        Ok(())
827    }
828
829    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
830        let ws = self.ws_client.clone();
831        let instrument_id = cmd.instrument_id;
832
833        get_runtime().spawn(async move {
834            if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
835                log::error!("Failed to subscribe to index prices: {e:?}");
836            }
837        });
838
839        Ok(())
840    }
841
842    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
843        let ws = self.ws_client.clone();
844        let instrument_id = cmd.instrument_id;
845
846        get_runtime().spawn(async move {
847            if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
848                log::error!("Failed to unsubscribe from index prices: {e:?}");
849            }
850        });
851
852        Ok(())
853    }
854
855    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
856        let ws = self.ws_client.clone();
857        let instrument_id = cmd.instrument_id;
858
859        get_runtime().spawn(async move {
860            if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
861                log::error!("Failed to subscribe to funding rates: {e:?}");
862            }
863        });
864
865        Ok(())
866    }
867
868    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
869        let ws = self.ws_client.clone();
870        let instrument_id = cmd.instrument_id;
871
872        get_runtime().spawn(async move {
873            if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
874                log::error!("Failed to unsubscribe from funding rates: {e:?}");
875            }
876        });
877
878        Ok(())
879    }
880
881    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
882        log::debug!("Subscribing to bars: {}", subscription.bar_type);
883
884        let instruments = self.instruments.read().unwrap();
885        let instrument_id = subscription.bar_type.instrument_id();
886        if !instruments.contains_key(&instrument_id) {
887            anyhow::bail!("Instrument {instrument_id} not found");
888        }
889
890        drop(instruments);
891
892        let bar_type = subscription.bar_type;
893        let ws = self.ws_client.clone();
894
895        get_runtime().spawn(async move {
896            if let Err(e) = ws.subscribe_bars(bar_type).await {
897                log::error!("Failed to subscribe to bars: {e:?}");
898            }
899        });
900
901        Ok(())
902    }
903
904    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
905        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
906
907        let bar_type = unsubscription.bar_type;
908        let ws = self.ws_client.clone();
909
910        get_runtime().spawn(async move {
911            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
912                log::error!("Failed to unsubscribe from bars: {e:?}");
913            }
914        });
915
916        Ok(())
917    }
918}
919
920pub(crate) fn candle_to_bar(
921    candle: &HyperliquidCandle,
922    bar_type: BarType,
923    price_precision: u8,
924    size_precision: u8,
925) -> anyhow::Result<Bar> {
926    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
927    let ts_event = ts_init;
928
929    let open = candle.open.parse::<f64>().context("parse open price")?;
930    let high = candle.high.parse::<f64>().context("parse high price")?;
931    let low = candle.low.parse::<f64>().context("parse low price")?;
932    let close = candle.close.parse::<f64>().context("parse close price")?;
933    let volume = candle.volume.parse::<f64>().context("parse volume")?;
934
935    Ok(Bar::new(
936        bar_type,
937        Price::new(open, price_precision),
938        Price::new(high, price_precision),
939        Price::new(low, price_precision),
940        Price::new(close, price_precision),
941        Quantity::new(volume, size_precision),
942        ts_event,
943        ts_init,
944    ))
945}
946
947/// Request bars from HTTP API.
948async fn request_bars_from_http(
949    http_client: HyperliquidHttpClient,
950    bar_type: BarType,
951    start: Option<DateTime<Utc>>,
952    end: Option<DateTime<Utc>>,
953    limit: Option<u32>,
954    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
955) -> anyhow::Result<Vec<Bar>> {
956    // Get instrument details for precision
957    let instrument_id = bar_type.instrument_id();
958    let instrument = {
959        let guard = instruments.read().unwrap();
960        guard
961            .get(&instrument_id)
962            .cloned()
963            .context("instrument not found in cache")?
964    };
965
966    let price_precision = instrument.price_precision();
967    let size_precision = instrument.size_precision();
968    let raw_symbol = instrument.raw_symbol();
969    let coin = raw_symbol.as_str();
970
971    let interval = bar_type_to_interval(&bar_type)?;
972
973    // Hyperliquid uses millisecond timestamps
974    let now = Utc::now();
975    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
976    let start_time = if let Some(start) = start {
977        start.timestamp_millis() as u64
978    } else {
979        // Default to 1000 bars before end_time
980        let spec = bar_type.spec();
981        let step_ms = match spec.aggregation {
982            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
983            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
984            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
985            _ => 60_000,
986        };
987        end_time.saturating_sub(1000 * step_ms)
988    };
989
990    let candles = http_client
991        .info_candle_snapshot(coin, interval, start_time, end_time)
992        .await
993        .context("failed to fetch candle snapshot from Hyperliquid")?;
994
995    let mut bars: Vec<Bar> = candles
996        .iter()
997        .filter_map(|candle| {
998            candle_to_bar(candle, bar_type, price_precision, size_precision)
999                .map_err(|e| {
1000                    log::warn!("Failed to convert candle to bar: {e}");
1001                    e
1002                })
1003                .ok()
1004        })
1005        .collect();
1006
1007    if let Some(limit) = limit
1008        && bars.len() > limit as usize
1009    {
1010        bars = bars.into_iter().take(limit as usize).collect();
1011    }
1012
1013    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1014    Ok(bars)
1015}