nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    messages::{
26        DataEvent,
27        data::{
28            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32            UnsubscribeQuotes, UnsubscribeTrades,
33        },
34    },
35    runner::get_data_event_sender,
36};
37use nautilus_core::{
38    UnixNanos,
39    time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43    data::{Bar, BarType, Data},
44    enums::{AggregationSource, BarAggregation},
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::consts::{HYPERLIQUID_TESTNET_WS_URL, HYPERLIQUID_VENUE, HYPERLIQUID_WS_URL},
55    config::HyperliquidDataClientConfig,
56    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57    websocket::client::HyperliquidWebSocketClient,
58};
59
60#[derive(Debug)]
61pub struct HyperliquidDataClient {
62    client_id: ClientId,
63    #[allow(dead_code)]
64    config: HyperliquidDataClientConfig,
65    http_client: HyperliquidHttpClient,
66    ws_client: HyperliquidWebSocketClient,
67    is_connected: AtomicBool,
68    cancellation_token: CancellationToken,
69    tasks: Vec<JoinHandle<()>>,
70    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
71    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72    clock: &'static AtomicTime,
73    #[allow(dead_code)]
74    instrument_refresh_active: bool,
75}
76
77impl HyperliquidDataClient {
78    /// Creates a new [`HyperliquidDataClient`] instance.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the HTTP client fails to initialize.
83    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
84        let clock = get_atomic_clock_realtime();
85        let data_sender = get_data_event_sender();
86
87        let http_client = if let Some(private_key_str) = &config.private_key {
88            let secrets = crate::common::credential::Secrets {
89                private_key: crate::common::credential::EvmPrivateKey::new(
90                    private_key_str.clone(),
91                )?,
92                is_testnet: config.is_testnet,
93                vault_address: None,
94            };
95            HyperliquidHttpClient::with_credentials(&secrets, config.http_timeout_secs)
96        } else {
97            HyperliquidHttpClient::new(config.is_testnet, config.http_timeout_secs)
98        };
99
100        let ws_url = if config.is_testnet {
101            HYPERLIQUID_TESTNET_WS_URL
102        } else {
103            HYPERLIQUID_WS_URL
104        };
105        let ws_client = HyperliquidWebSocketClient::new(ws_url.to_string());
106
107        Ok(Self {
108            client_id,
109            config,
110            http_client,
111            ws_client,
112            is_connected: AtomicBool::new(false),
113            cancellation_token: CancellationToken::new(),
114            tasks: Vec::new(),
115            data_sender,
116            instruments: Arc::new(RwLock::new(AHashMap::new())),
117            clock,
118            instrument_refresh_active: false,
119        })
120    }
121
122    fn venue(&self) -> Venue {
123        *HYPERLIQUID_VENUE
124    }
125
126    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
127        let instruments = self
128            .http_client
129            .request_instruments()
130            .await
131            .context("Failed to fetch instruments during bootstrap")?;
132
133        let mut instruments_map = self.instruments.write().unwrap();
134        for instrument in &instruments {
135            instruments_map.insert(instrument.id(), instrument.clone());
136        }
137
138        tracing::info!("Bootstrapped {} instruments", instruments_map.len());
139        Ok(instruments)
140    }
141
142    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
143        self.ws_client
144            .ensure_connected()
145            .await
146            .context("Failed to connect to Hyperliquid WebSocket")?;
147
148        Ok(())
149    }
150
151    fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
152        let instruments = self.instruments.read().unwrap();
153        instruments
154            .get(instrument_id)
155            .cloned()
156            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
157    }
158}
159
160fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
161    value
162        .and_then(|dt| dt.timestamp_nanos_opt())
163        .and_then(|nanos| u64::try_from(nanos).ok())
164        .map(UnixNanos::from)
165}
166
167impl HyperliquidDataClient {
168    #[allow(dead_code)]
169    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
170        if let Err(err) = sender.send(DataEvent::Data(data)) {
171            tracing::error!("Failed to emit data event: {err}");
172        }
173    }
174}
175
176#[async_trait::async_trait]
177impl DataClient for HyperliquidDataClient {
178    fn client_id(&self) -> ClientId {
179        self.client_id
180    }
181
182    fn venue(&self) -> Option<Venue> {
183        Some(self.venue())
184    }
185
186    fn start(&mut self) -> anyhow::Result<()> {
187        tracing::info!("Starting Hyperliquid data client {}", self.client_id);
188        Ok(())
189    }
190
191    fn stop(&mut self) -> anyhow::Result<()> {
192        tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
193        self.cancellation_token.cancel();
194        self.is_connected.store(false, Ordering::Relaxed);
195        Ok(())
196    }
197
198    fn reset(&mut self) -> anyhow::Result<()> {
199        tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
200        self.is_connected.store(false, Ordering::Relaxed);
201        self.cancellation_token = CancellationToken::new();
202        self.tasks.clear();
203        Ok(())
204    }
205
206    fn dispose(&mut self) -> anyhow::Result<()> {
207        tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
208        self.stop()
209    }
210
211    fn is_connected(&self) -> bool {
212        self.is_connected.load(Ordering::Acquire)
213    }
214
215    fn is_disconnected(&self) -> bool {
216        !self.is_connected()
217    }
218
219    async fn connect(&mut self) -> anyhow::Result<()> {
220        if self.is_connected() {
221            return Ok(());
222        }
223
224        // Bootstrap instruments from HTTP API
225        let _instruments = self
226            .bootstrap_instruments()
227            .await
228            .context("Failed to bootstrap instruments")?;
229
230        // Connect WebSocket client
231        self.spawn_ws()
232            .await
233            .context("Failed to spawn WebSocket client")?;
234
235        self.is_connected.store(true, Ordering::Relaxed);
236        tracing::info!("Hyperliquid data client connected");
237
238        Ok(())
239    }
240
241    async fn disconnect(&mut self) -> anyhow::Result<()> {
242        if !self.is_connected() {
243            return Ok(());
244        }
245
246        // Cancel all tasks
247        self.cancellation_token.cancel();
248
249        // Wait for all tasks to complete
250        for task in self.tasks.drain(..) {
251            if let Err(e) = task.await {
252                tracing::error!("Error waiting for task to complete: {e}");
253            }
254        }
255
256        // Disconnect WebSocket client
257        if let Err(e) = self.ws_client.disconnect().await {
258            tracing::error!("Error disconnecting WebSocket client: {e}");
259        }
260
261        // Clear state
262        {
263            let mut instruments = self.instruments.write().unwrap();
264            instruments.clear();
265        }
266
267        self.is_connected.store(false, Ordering::Relaxed);
268        tracing::info!("Hyperliquid data client disconnected");
269
270        Ok(())
271    }
272
273    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
274        tracing::debug!("Requesting all instruments");
275
276        let instruments = {
277            let instruments_map = self.instruments.read().unwrap();
278            instruments_map.values().cloned().collect()
279        };
280
281        let response = DataResponse::Instruments(InstrumentsResponse::new(
282            request.request_id,
283            request.client_id.unwrap_or(self.client_id),
284            self.venue(),
285            instruments,
286            datetime_to_unix_nanos(request.start),
287            datetime_to_unix_nanos(request.end),
288            self.clock.get_time_ns(),
289            request.params.clone(),
290        ));
291
292        if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
293            tracing::error!("Failed to send instruments response: {err}");
294        }
295
296        Ok(())
297    }
298
299    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
300        tracing::debug!("Requesting instrument: {}", request.instrument_id);
301
302        let instrument = self.get_instrument(&request.instrument_id)?;
303
304        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
305            request.request_id,
306            request.client_id.unwrap_or(self.client_id),
307            instrument.id(),
308            instrument,
309            datetime_to_unix_nanos(request.start),
310            datetime_to_unix_nanos(request.end),
311            self.clock.get_time_ns(),
312            request.params.clone(),
313        )));
314
315        if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
316            tracing::error!("Failed to send instrument response: {err}");
317        }
318
319        Ok(())
320    }
321
322    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
323        tracing::debug!("Requesting bars for {}", request.bar_type);
324
325        let http = self.http_client.clone();
326        let sender = self.data_sender.clone();
327        let bar_type = request.bar_type;
328        let start = request.start;
329        let end = request.end;
330        let limit = request.limit.map(|n| n.get() as u32);
331        let request_id = request.request_id;
332        let client_id = request.client_id.unwrap_or(self.client_id);
333        let params = request.params.clone();
334        let clock = self.clock;
335        let start_nanos = datetime_to_unix_nanos(start);
336        let end_nanos = datetime_to_unix_nanos(end);
337        let instruments = Arc::clone(&self.instruments);
338
339        tokio::spawn(async move {
340            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
341                Ok(bars) => {
342                    let response = DataResponse::Bars(BarsResponse::new(
343                        request_id,
344                        client_id,
345                        bar_type,
346                        bars,
347                        start_nanos,
348                        end_nanos,
349                        clock.get_time_ns(),
350                        params,
351                    ));
352                    if let Err(err) = sender.send(DataEvent::Response(response)) {
353                        tracing::error!("Failed to send bars response: {err}");
354                    }
355                }
356                Err(err) => tracing::error!("Bar request failed: {err:?}"),
357            }
358        });
359
360        Ok(())
361    }
362
363    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
364        tracing::debug!("Requesting trades for {}", request.instrument_id);
365
366        let trades = Vec::new();
367
368        let response = DataResponse::Trades(TradesResponse::new(
369            request.request_id,
370            request.client_id.unwrap_or(self.client_id),
371            request.instrument_id,
372            trades,
373            datetime_to_unix_nanos(request.start),
374            datetime_to_unix_nanos(request.end),
375            self.clock.get_time_ns(),
376            request.params.clone(),
377        ));
378
379        if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
380            tracing::error!("Failed to send trades response: {err}");
381        }
382
383        Ok(())
384    }
385
386    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
387        tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
388
389        // Validate instrument exists
390        let instruments = self.instruments.read().unwrap();
391        if !instruments.contains_key(&subscription.instrument_id) {
392            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
393        }
394
395        // Extract coin symbol from instrument ID
396        let coin = subscription
397            .instrument_id
398            .symbol
399            .as_str()
400            .split('-')
401            .next()
402            .context("Invalid instrument symbol")?;
403        let coin = Ustr::from(coin);
404
405        // Clone WebSocket client for async task
406        let ws = self.ws_client.clone();
407
408        // Spawn async task to subscribe
409        tokio::spawn(async move {
410            if let Err(err) = ws.subscribe_trades(coin).await {
411                tracing::error!("Failed to subscribe to trades: {err:?}");
412            }
413        });
414
415        tracing::info!("Subscribed to trades for {}", subscription.instrument_id);
416
417        Ok(())
418    }
419
420    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
421        tracing::debug!(
422            "Unsubscribing from trades: {}",
423            unsubscription.instrument_id
424        );
425
426        // Extract coin symbol from instrument ID
427        let coin = unsubscription
428            .instrument_id
429            .symbol
430            .as_str()
431            .split('-')
432            .next()
433            .context("Invalid instrument symbol")?;
434        let coin = Ustr::from(coin);
435
436        // Clone WebSocket client for async task
437        let ws = self.ws_client.clone();
438
439        // Spawn async task to unsubscribe
440        tokio::spawn(async move {
441            if let Err(err) = ws.unsubscribe_trades(coin).await {
442                tracing::error!("Failed to unsubscribe from trades: {err:?}");
443            }
444        });
445
446        tracing::info!(
447            "Unsubscribed from trades for {}",
448            unsubscription.instrument_id
449        );
450
451        Ok(())
452    }
453
454    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
455        tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
456
457        // Validate book type
458        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
459            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
460        }
461
462        // Validate instrument exists
463        let instruments = self.instruments.read().unwrap();
464        if !instruments.contains_key(&subscription.instrument_id) {
465            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
466        }
467        drop(instruments);
468
469        // Extract coin symbol from instrument ID
470        let coin = subscription
471            .instrument_id
472            .symbol
473            .as_str()
474            .split('-')
475            .next()
476            .context("Invalid instrument symbol")?;
477        let coin = Ustr::from(coin);
478
479        // Clone WebSocket client for async task
480        let ws = self.ws_client.clone();
481
482        // Spawn async task to subscribe
483        tokio::spawn(async move {
484            if let Err(err) = ws.subscribe_book(coin).await {
485                tracing::error!("Failed to subscribe to book deltas: {err:?}");
486            }
487        });
488
489        tracing::info!(
490            "Subscribed to book deltas for {}",
491            subscription.instrument_id
492        );
493
494        Ok(())
495    }
496
497    fn unsubscribe_book_deltas(
498        &mut self,
499        unsubscription: &UnsubscribeBookDeltas,
500    ) -> anyhow::Result<()> {
501        tracing::debug!(
502            "Unsubscribing from book deltas: {}",
503            unsubscription.instrument_id
504        );
505
506        // Extract coin symbol from instrument ID
507        let coin = unsubscription
508            .instrument_id
509            .symbol
510            .as_str()
511            .split('-')
512            .next()
513            .context("Invalid instrument symbol")?;
514        let coin = Ustr::from(coin);
515
516        // Clone WebSocket client for async task
517        let ws = self.ws_client.clone();
518
519        // Spawn async task to unsubscribe
520        tokio::spawn(async move {
521            if let Err(err) = ws.unsubscribe_book(coin).await {
522                tracing::error!("Failed to unsubscribe from book deltas: {err:?}");
523            }
524        });
525
526        tracing::info!(
527            "Unsubscribed from book deltas for {}",
528            unsubscription.instrument_id
529        );
530
531        Ok(())
532    }
533
534    fn subscribe_book_snapshots(
535        &mut self,
536        subscription: &SubscribeBookSnapshots,
537    ) -> anyhow::Result<()> {
538        tracing::debug!(
539            "Subscribing to book snapshots: {}",
540            subscription.instrument_id
541        );
542
543        // Validate book type
544        if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
545            anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
546        }
547
548        // Validate instrument exists
549        let instruments = self.instruments.read().unwrap();
550        if !instruments.contains_key(&subscription.instrument_id) {
551            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
552        }
553        drop(instruments);
554
555        // Extract coin symbol from instrument ID
556        let coin = subscription
557            .instrument_id
558            .symbol
559            .as_str()
560            .split('-')
561            .next()
562            .context("Invalid instrument symbol")?;
563        let coin = Ustr::from(coin);
564
565        // Clone WebSocket client for async task
566        let ws = self.ws_client.clone();
567
568        // Spawn async task to subscribe
569        tokio::spawn(async move {
570            if let Err(err) = ws.subscribe_bbo(coin).await {
571                tracing::error!("Failed to subscribe to book snapshots: {err:?}");
572            }
573        });
574
575        tracing::info!(
576            "Subscribed to book snapshots for {}",
577            subscription.instrument_id
578        );
579
580        Ok(())
581    }
582
583    fn unsubscribe_book_snapshots(
584        &mut self,
585        unsubscription: &UnsubscribeBookSnapshots,
586    ) -> anyhow::Result<()> {
587        tracing::debug!(
588            "Unsubscribing from book snapshots: {}",
589            unsubscription.instrument_id
590        );
591
592        // Extract coin symbol from instrument ID
593        let coin = unsubscription
594            .instrument_id
595            .symbol
596            .as_str()
597            .split('-')
598            .next()
599            .context("Invalid instrument symbol")?;
600        let coin = Ustr::from(coin);
601
602        // Clone WebSocket client for async task
603        let ws = self.ws_client.clone();
604
605        // Spawn async task to unsubscribe
606        tokio::spawn(async move {
607            if let Err(err) = ws.unsubscribe_bbo(coin).await {
608                tracing::error!("Failed to unsubscribe from book snapshots: {err:?}");
609            }
610        });
611
612        tracing::info!(
613            "Unsubscribed from book snapshots for {}",
614            unsubscription.instrument_id
615        );
616
617        Ok(())
618    }
619
620    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
621        tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
622
623        // Validate instrument exists
624        let instruments = self.instruments.read().unwrap();
625        if !instruments.contains_key(&subscription.instrument_id) {
626            anyhow::bail!("Instrument {} not found", subscription.instrument_id);
627        }
628        drop(instruments);
629
630        // Extract coin symbol from instrument ID
631        let coin = subscription
632            .instrument_id
633            .symbol
634            .as_str()
635            .split('-')
636            .next()
637            .context("Invalid instrument symbol")?;
638        let coin = Ustr::from(coin);
639
640        // Clone WebSocket client for async task
641        let ws = self.ws_client.clone();
642
643        // Spawn async task to subscribe
644        tokio::spawn(async move {
645            if let Err(err) = ws.subscribe_bbo(coin).await {
646                tracing::error!("Failed to subscribe to quotes: {err:?}");
647            }
648        });
649
650        tracing::info!("Subscribed to quotes for {}", subscription.instrument_id);
651
652        Ok(())
653    }
654
655    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
656        tracing::debug!(
657            "Unsubscribing from quotes: {}",
658            unsubscription.instrument_id
659        );
660
661        // Extract coin symbol from instrument ID
662        let coin = unsubscription
663            .instrument_id
664            .symbol
665            .as_str()
666            .split('-')
667            .next()
668            .context("Invalid instrument symbol")?;
669        let coin = Ustr::from(coin);
670
671        // Clone WebSocket client for async task
672        let ws = self.ws_client.clone();
673
674        // Spawn async task to unsubscribe
675        tokio::spawn(async move {
676            if let Err(err) = ws.unsubscribe_bbo(coin).await {
677                tracing::error!("Failed to unsubscribe from quotes: {err:?}");
678            }
679        });
680
681        tracing::info!(
682            "Unsubscribed from quotes for {}",
683            unsubscription.instrument_id
684        );
685
686        Ok(())
687    }
688
689    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
690        tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
691
692        // Validate instrument exists
693        let instruments = self.instruments.read().unwrap();
694        let instrument_id = subscription.bar_type.instrument_id();
695        if !instruments.contains_key(&instrument_id) {
696            anyhow::bail!("Instrument {} not found", instrument_id);
697        }
698
699        drop(instruments);
700
701        // Convert bar type to interval
702        let interval = bar_type_to_interval(&subscription.bar_type)?;
703
704        // Extract coin symbol from instrument ID
705        let coin = instrument_id
706            .symbol
707            .as_str()
708            .split('-')
709            .next()
710            .context("Invalid instrument symbol")?;
711        let coin = Ustr::from(coin);
712
713        // Clone WebSocket client for async task
714        let ws = self.ws_client.clone();
715
716        // Spawn async task to subscribe
717        tokio::spawn(async move {
718            if let Err(err) = ws.subscribe_candle(coin, interval).await {
719                tracing::error!("Failed to subscribe to bars: {err:?}");
720            }
721        });
722
723        tracing::info!("Subscribed to bars for {}", subscription.bar_type);
724
725        Ok(())
726    }
727
728    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
729        tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
730
731        // Convert bar type to interval
732        let interval = bar_type_to_interval(&unsubscription.bar_type)?;
733
734        // Extract coin symbol from instrument ID
735        let instrument_id = unsubscription.bar_type.instrument_id();
736        let coin = instrument_id
737            .symbol
738            .as_str()
739            .split('-')
740            .next()
741            .context("Invalid instrument symbol")?;
742        let coin = Ustr::from(coin);
743
744        // Clone WebSocket client for async task
745        let ws = self.ws_client.clone();
746
747        // Spawn async task to unsubscribe
748        tokio::spawn(async move {
749            if let Err(err) = ws.unsubscribe_candle(coin, interval).await {
750                tracing::error!("Failed to unsubscribe from bars: {err:?}");
751            }
752        });
753
754        tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
755
756        Ok(())
757    }
758}
759
760/// Convert BarType to Hyperliquid interval string.
761fn bar_type_to_interval(bar_type: &BarType) -> anyhow::Result<String> {
762    let spec = bar_type.spec();
763    let step = spec.step.get();
764
765    anyhow::ensure!(
766        bar_type.aggregation_source() == AggregationSource::External,
767        "Only EXTERNAL aggregation is supported"
768    );
769
770    let interval = match spec.aggregation {
771        BarAggregation::Minute => format!("{step}m"),
772        BarAggregation::Hour => format!("{step}h"),
773        BarAggregation::Day => format!("{step}d"),
774        a => anyhow::bail!("Hyperliquid does not support {a:?} aggregation"),
775    };
776
777    Ok(interval)
778}
779
780/// Convert HyperliquidCandle to Nautilus Bar.
781fn candle_to_bar(
782    candle: &HyperliquidCandle,
783    bar_type: BarType,
784    price_precision: u8,
785    size_precision: u8,
786) -> anyhow::Result<Bar> {
787    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000); // Convert ms to ns
788    let ts_event = ts_init;
789
790    let open = candle.open.parse::<f64>().context("parse open price")?;
791    let high = candle.high.parse::<f64>().context("parse high price")?;
792    let low = candle.low.parse::<f64>().context("parse low price")?;
793    let close = candle.close.parse::<f64>().context("parse close price")?;
794    let volume = candle.volume.parse::<f64>().context("parse volume")?;
795
796    Ok(Bar::new(
797        bar_type,
798        Price::new(open, price_precision),
799        Price::new(high, price_precision),
800        Price::new(low, price_precision),
801        Price::new(close, price_precision),
802        Quantity::new(volume, size_precision),
803        ts_event,
804        ts_init,
805    ))
806}
807
808/// Request bars from HTTP API.
809async fn request_bars_from_http(
810    http_client: HyperliquidHttpClient,
811    bar_type: BarType,
812    start: Option<DateTime<Utc>>,
813    end: Option<DateTime<Utc>>,
814    limit: Option<u32>,
815    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
816) -> anyhow::Result<Vec<Bar>> {
817    // Get instrument details for precision
818    let instrument_id = bar_type.instrument_id();
819    let instrument = {
820        let guard = instruments.read().unwrap();
821        guard
822            .get(&instrument_id)
823            .cloned()
824            .context("Instrument not found in cache")?
825    };
826
827    let price_precision = instrument.price_precision();
828    let size_precision = instrument.size_precision();
829
830    // Extract coin symbol from instrument ID (e.g., "BTC-PERP.HYPERLIQUID" -> "BTC")
831    let coin = instrument_id
832        .symbol
833        .as_str()
834        .split('-')
835        .next()
836        .context("Invalid instrument symbol")?;
837
838    // Convert bar type to Hyperliquid interval
839    let interval = bar_type_to_interval(&bar_type)?;
840
841    // Calculate time range (Hyperliquid uses milliseconds)
842    let now = Utc::now();
843    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
844    let start_time = if let Some(start) = start {
845        start.timestamp_millis() as u64
846    } else {
847        // Default to 1000 bars before end_time
848        let spec = bar_type.spec();
849        let step_ms = match spec.aggregation {
850            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
851            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
852            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
853            _ => 60_000, // Default to 1 minute
854        };
855        end_time.saturating_sub(1000 * step_ms)
856    };
857
858    // Fetch candles from API
859    let response = http_client
860        .info_candle_snapshot(coin, &interval, start_time, end_time)
861        .await
862        .context("Failed to fetch candle snapshot from Hyperliquid")?;
863
864    // Convert candles to bars
865    let mut bars: Vec<Bar> = response
866        .data
867        .iter()
868        .filter_map(|candle| {
869            candle_to_bar(candle, bar_type, price_precision, size_precision)
870                .map_err(|e| {
871                    tracing::warn!("Failed to convert candle to bar: {e}");
872                    e
873                })
874                .ok()
875        })
876        .collect();
877
878    // Apply limit if specified
879    if let Some(limit) = limit
880        && bars.len() > limit as usize
881    {
882        bars = bars.into_iter().take(limit as usize).collect();
883    }
884
885    tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
886    Ok(bars)
887}