1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 messages::{
26 DataEvent,
27 data::{
28 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32 UnsubscribeQuotes, UnsubscribeTrades,
33 },
34 },
35 runner::get_data_event_sender,
36};
37use nautilus_core::{
38 UnixNanos,
39 time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43 data::{Bar, BarType, Data},
44 enums::{AggregationSource, BarAggregation},
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::consts::{HYPERLIQUID_TESTNET_WS_URL, HYPERLIQUID_VENUE, HYPERLIQUID_WS_URL},
55 config::HyperliquidDataClientConfig,
56 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57 websocket::{
58 client::HyperliquidWebSocketClient, messages::HyperliquidWsMessage,
59 parse::parse_ws_quote_tick,
60 },
61};
62
63#[derive(Debug)]
64pub struct HyperliquidDataClient {
65 client_id: ClientId,
66 #[allow(dead_code)]
67 config: HyperliquidDataClientConfig,
68 http_client: HyperliquidHttpClient,
69 ws_client: HyperliquidWebSocketClient,
70 is_connected: AtomicBool,
71 cancellation_token: CancellationToken,
72 tasks: Vec<JoinHandle<()>>,
73 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
75 coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
78 clock: &'static AtomicTime,
79 #[allow(dead_code)]
80 instrument_refresh_active: bool,
81}
82
83impl HyperliquidDataClient {
84 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
90 let clock = get_atomic_clock_realtime();
91 let data_sender = get_data_event_sender();
92
93 let http_client = if let Some(private_key_str) = &config.private_key {
94 let secrets = crate::common::credential::Secrets {
95 private_key: crate::common::credential::EvmPrivateKey::new(
96 private_key_str.clone(),
97 )?,
98 is_testnet: config.is_testnet,
99 vault_address: None,
100 };
101 HyperliquidHttpClient::with_credentials(&secrets, config.http_timeout_secs)
102 } else {
103 HyperliquidHttpClient::new(config.is_testnet, config.http_timeout_secs)
104 };
105
106 let ws_url = if config.is_testnet {
107 HYPERLIQUID_TESTNET_WS_URL
108 } else {
109 HYPERLIQUID_WS_URL
110 };
111 let ws_client = HyperliquidWebSocketClient::new(ws_url.to_string());
112
113 Ok(Self {
114 client_id,
115 config,
116 http_client,
117 ws_client,
118 is_connected: AtomicBool::new(false),
119 cancellation_token: CancellationToken::new(),
120 tasks: Vec::new(),
121 data_sender,
122 instruments: Arc::new(RwLock::new(AHashMap::new())),
123 coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
124 clock,
125 instrument_refresh_active: false,
126 })
127 }
128
129 fn venue(&self) -> Venue {
130 *HYPERLIQUID_VENUE
131 }
132
133 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
134 let instruments = self
135 .http_client
136 .request_instruments()
137 .await
138 .context("Failed to fetch instruments during bootstrap")?;
139
140 let mut instruments_map = self.instruments.write().unwrap();
141 let mut coin_map = self.coin_to_instrument_id.write().unwrap();
142
143 for instrument in &instruments {
144 let instrument_id = instrument.id();
145 instruments_map.insert(instrument_id, instrument.clone());
146
147 let symbol = instrument_id.symbol.as_str();
150 if let Some(coin) = symbol.split('-').next() {
151 coin_map.insert(Ustr::from(coin), instrument_id);
152 }
153
154 self.ws_client.add_instrument(instrument.clone());
157 }
158
159 tracing::info!(
160 "Bootstrapped {} instruments with {} coin mappings",
161 instruments_map.len(),
162 coin_map.len()
163 );
164 Ok(instruments)
165 }
166
167 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
168 self.ws_client
169 .ensure_connected()
170 .await
171 .context("Failed to connect to Hyperliquid WebSocket")?;
172
173 Ok(())
175 }
176
177 #[allow(dead_code)]
178 fn handle_ws_message(
179 msg: HyperliquidWsMessage,
180 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
181 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
182 coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
183 _venue: Venue,
184 clock: &'static AtomicTime,
185 ) {
186 match msg {
187 HyperliquidWsMessage::Bbo { data } => {
188 tracing::debug!("Received BBO message for coin: {}", data.coin);
189
190 let coin_map = coin_to_instrument_id.read().unwrap();
193 let instrument_id = coin_map.get(&data.coin);
194
195 if let Some(&instrument_id) = instrument_id {
196 let instruments_map = instruments.read().unwrap();
197 if let Some(instrument) = instruments_map.get(&instrument_id) {
198 let ts_init = clock.get_time_ns();
199
200 match parse_ws_quote_tick(&data, instrument, ts_init) {
201 Ok(quote_tick) => {
202 tracing::debug!(
203 "Parsed quote tick for {}: bid={}, ask={}",
204 data.coin,
205 quote_tick.bid_price,
206 quote_tick.ask_price
207 );
208 if let Err(e) =
209 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
210 {
211 tracing::error!("Failed to send quote tick: {e}");
212 }
213 }
214 Err(e) => {
215 tracing::error!(
216 "Failed to parse quote tick for {}: {e}",
217 data.coin
218 );
219 }
220 }
221 }
222 } else {
223 tracing::warn!(
224 "Received BBO for unknown coin: {} (no matching instrument found)",
225 data.coin
226 );
227 }
228 }
229 _ => {
230 tracing::trace!("Received WebSocket message: {:?}", msg);
232 }
233 }
234 }
235
236 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
237 let instruments = self.instruments.read().unwrap();
238 instruments
239 .get(instrument_id)
240 .cloned()
241 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
242 }
243}
244
245fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
246 value
247 .and_then(|dt| dt.timestamp_nanos_opt())
248 .and_then(|nanos| u64::try_from(nanos).ok())
249 .map(UnixNanos::from)
250}
251
252impl HyperliquidDataClient {
253 #[allow(dead_code)]
254 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
255 if let Err(e) = sender.send(DataEvent::Data(data)) {
256 tracing::error!("Failed to emit data event: {e}");
257 }
258 }
259}
260
261#[async_trait::async_trait]
262impl DataClient for HyperliquidDataClient {
263 fn client_id(&self) -> ClientId {
264 self.client_id
265 }
266
267 fn venue(&self) -> Option<Venue> {
268 Some(self.venue())
269 }
270
271 fn start(&mut self) -> anyhow::Result<()> {
272 tracing::info!("Starting Hyperliquid data client {}", self.client_id);
273 Ok(())
274 }
275
276 fn stop(&mut self) -> anyhow::Result<()> {
277 tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
278 self.cancellation_token.cancel();
279 self.is_connected.store(false, Ordering::Relaxed);
280 Ok(())
281 }
282
283 fn reset(&mut self) -> anyhow::Result<()> {
284 tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
285 self.is_connected.store(false, Ordering::Relaxed);
286 self.cancellation_token = CancellationToken::new();
287 self.tasks.clear();
288 Ok(())
289 }
290
291 fn dispose(&mut self) -> anyhow::Result<()> {
292 tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
293 self.stop()
294 }
295
296 fn is_connected(&self) -> bool {
297 self.is_connected.load(Ordering::Acquire)
298 }
299
300 fn is_disconnected(&self) -> bool {
301 !self.is_connected()
302 }
303
304 async fn connect(&mut self) -> anyhow::Result<()> {
305 if self.is_connected() {
306 return Ok(());
307 }
308
309 let _instruments = self
311 .bootstrap_instruments()
312 .await
313 .context("Failed to bootstrap instruments")?;
314
315 self.spawn_ws()
317 .await
318 .context("Failed to spawn WebSocket client")?;
319
320 self.is_connected.store(true, Ordering::Relaxed);
321 tracing::info!("Hyperliquid data client connected");
322
323 Ok(())
324 }
325
326 async fn disconnect(&mut self) -> anyhow::Result<()> {
327 if !self.is_connected() {
328 return Ok(());
329 }
330
331 self.cancellation_token.cancel();
333
334 for task in self.tasks.drain(..) {
336 if let Err(e) = task.await {
337 tracing::error!("Error waiting for task to complete: {e}");
338 }
339 }
340
341 if let Err(e) = self.ws_client.disconnect().await {
343 tracing::error!("Error disconnecting WebSocket client: {e}");
344 }
345
346 {
348 let mut instruments = self.instruments.write().unwrap();
349 instruments.clear();
350 }
351
352 self.is_connected.store(false, Ordering::Relaxed);
353 tracing::info!("Hyperliquid data client disconnected");
354
355 Ok(())
356 }
357
358 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
359 tracing::debug!("Requesting all instruments");
360
361 let instruments = {
362 let instruments_map = self.instruments.read().unwrap();
363 instruments_map.values().cloned().collect()
364 };
365
366 let response = DataResponse::Instruments(InstrumentsResponse::new(
367 request.request_id,
368 request.client_id.unwrap_or(self.client_id),
369 self.venue(),
370 instruments,
371 datetime_to_unix_nanos(request.start),
372 datetime_to_unix_nanos(request.end),
373 self.clock.get_time_ns(),
374 request.params.clone(),
375 ));
376
377 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
378 tracing::error!("Failed to send instruments response: {e}");
379 }
380
381 Ok(())
382 }
383
384 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
385 tracing::debug!("Requesting instrument: {}", request.instrument_id);
386
387 let instrument = self.get_instrument(&request.instrument_id)?;
388
389 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
390 request.request_id,
391 request.client_id.unwrap_or(self.client_id),
392 instrument.id(),
393 instrument,
394 datetime_to_unix_nanos(request.start),
395 datetime_to_unix_nanos(request.end),
396 self.clock.get_time_ns(),
397 request.params.clone(),
398 )));
399
400 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
401 tracing::error!("Failed to send instrument response: {e}");
402 }
403
404 Ok(())
405 }
406
407 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
408 tracing::debug!("Requesting bars for {}", request.bar_type);
409
410 let http = self.http_client.clone();
411 let sender = self.data_sender.clone();
412 let bar_type = request.bar_type;
413 let start = request.start;
414 let end = request.end;
415 let limit = request.limit.map(|n| n.get() as u32);
416 let request_id = request.request_id;
417 let client_id = request.client_id.unwrap_or(self.client_id);
418 let params = request.params.clone();
419 let clock = self.clock;
420 let start_nanos = datetime_to_unix_nanos(start);
421 let end_nanos = datetime_to_unix_nanos(end);
422 let instruments = Arc::clone(&self.instruments);
423
424 tokio::spawn(async move {
425 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
426 Ok(bars) => {
427 let response = DataResponse::Bars(BarsResponse::new(
428 request_id,
429 client_id,
430 bar_type,
431 bars,
432 start_nanos,
433 end_nanos,
434 clock.get_time_ns(),
435 params,
436 ));
437 if let Err(e) = sender.send(DataEvent::Response(response)) {
438 tracing::error!("Failed to send bars response: {e}");
439 }
440 }
441 Err(e) => tracing::error!("Bar request failed: {e:?}"),
442 }
443 });
444
445 Ok(())
446 }
447
448 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
449 tracing::debug!("Requesting trades for {}", request.instrument_id);
450
451 let trades = Vec::new();
452
453 let response = DataResponse::Trades(TradesResponse::new(
454 request.request_id,
455 request.client_id.unwrap_or(self.client_id),
456 request.instrument_id,
457 trades,
458 datetime_to_unix_nanos(request.start),
459 datetime_to_unix_nanos(request.end),
460 self.clock.get_time_ns(),
461 request.params.clone(),
462 ));
463
464 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
465 tracing::error!("Failed to send trades response: {e}");
466 }
467
468 Ok(())
469 }
470
471 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
472 tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
473
474 let instruments = self.instruments.read().unwrap();
476 if !instruments.contains_key(&subscription.instrument_id) {
477 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
478 }
479
480 let coin = subscription
482 .instrument_id
483 .symbol
484 .as_str()
485 .split('-')
486 .next()
487 .context("Invalid instrument symbol")?;
488 let coin = Ustr::from(coin);
489
490 let ws = self.ws_client.clone();
492
493 tokio::spawn(async move {
495 if let Err(e) = ws.subscribe_trades(coin).await {
496 tracing::error!("Failed to subscribe to trades: {e:?}");
497 }
498 });
499
500 tracing::info!("Subscribed to trades for {}", subscription.instrument_id);
501
502 Ok(())
503 }
504
505 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
506 tracing::debug!(
507 "Unsubscribing from trades: {}",
508 unsubscription.instrument_id
509 );
510
511 let coin = unsubscription
513 .instrument_id
514 .symbol
515 .as_str()
516 .split('-')
517 .next()
518 .context("Invalid instrument symbol")?;
519 let coin = Ustr::from(coin);
520
521 let ws = self.ws_client.clone();
523
524 tokio::spawn(async move {
526 if let Err(e) = ws.unsubscribe_trades(coin).await {
527 tracing::error!("Failed to unsubscribe from trades: {e:?}");
528 }
529 });
530
531 tracing::info!(
532 "Unsubscribed from trades for {}",
533 unsubscription.instrument_id
534 );
535
536 Ok(())
537 }
538
539 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
540 tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
541
542 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
544 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
545 }
546
547 let instruments = self.instruments.read().unwrap();
549 if !instruments.contains_key(&subscription.instrument_id) {
550 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
551 }
552 drop(instruments);
553
554 let coin = subscription
556 .instrument_id
557 .symbol
558 .as_str()
559 .split('-')
560 .next()
561 .context("Invalid instrument symbol")?;
562 let coin = Ustr::from(coin);
563
564 let ws = self.ws_client.clone();
566
567 tokio::spawn(async move {
569 if let Err(e) = ws.subscribe_book(coin).await {
570 tracing::error!("Failed to subscribe to book deltas: {e:?}");
571 }
572 });
573
574 tracing::info!(
575 "Subscribed to book deltas for {}",
576 subscription.instrument_id
577 );
578
579 Ok(())
580 }
581
582 fn unsubscribe_book_deltas(
583 &mut self,
584 unsubscription: &UnsubscribeBookDeltas,
585 ) -> anyhow::Result<()> {
586 tracing::debug!(
587 "Unsubscribing from book deltas: {}",
588 unsubscription.instrument_id
589 );
590
591 let coin = unsubscription
593 .instrument_id
594 .symbol
595 .as_str()
596 .split('-')
597 .next()
598 .context("Invalid instrument symbol")?;
599 let coin = Ustr::from(coin);
600
601 let ws = self.ws_client.clone();
603
604 tokio::spawn(async move {
606 if let Err(e) = ws.unsubscribe_book(coin).await {
607 tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
608 }
609 });
610
611 tracing::info!(
612 "Unsubscribed from book deltas for {}",
613 unsubscription.instrument_id
614 );
615
616 Ok(())
617 }
618
619 fn subscribe_book_snapshots(
620 &mut self,
621 subscription: &SubscribeBookSnapshots,
622 ) -> anyhow::Result<()> {
623 tracing::debug!(
624 "Subscribing to book snapshots: {}",
625 subscription.instrument_id
626 );
627
628 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
630 anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
631 }
632
633 let instruments = self.instruments.read().unwrap();
635 if !instruments.contains_key(&subscription.instrument_id) {
636 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
637 }
638 drop(instruments);
639
640 let coin = subscription
642 .instrument_id
643 .symbol
644 .as_str()
645 .split('-')
646 .next()
647 .context("Invalid instrument symbol")?;
648 let coin = Ustr::from(coin);
649
650 let ws = self.ws_client.clone();
652
653 tokio::spawn(async move {
655 if let Err(e) = ws.subscribe_bbo(coin).await {
656 tracing::error!("Failed to subscribe to book snapshots: {e:?}");
657 }
658 });
659
660 tracing::info!(
661 "Subscribed to book snapshots for {}",
662 subscription.instrument_id
663 );
664
665 Ok(())
666 }
667
668 fn unsubscribe_book_snapshots(
669 &mut self,
670 unsubscription: &UnsubscribeBookSnapshots,
671 ) -> anyhow::Result<()> {
672 tracing::debug!(
673 "Unsubscribing from book snapshots: {}",
674 unsubscription.instrument_id
675 );
676
677 let coin = unsubscription
679 .instrument_id
680 .symbol
681 .as_str()
682 .split('-')
683 .next()
684 .context("Invalid instrument symbol")?;
685 let coin = Ustr::from(coin);
686
687 let ws = self.ws_client.clone();
689
690 tokio::spawn(async move {
692 if let Err(e) = ws.unsubscribe_bbo(coin).await {
693 tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
694 }
695 });
696
697 tracing::info!(
698 "Unsubscribed from book snapshots for {}",
699 unsubscription.instrument_id
700 );
701
702 Ok(())
703 }
704
705 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
706 tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
707
708 let instruments = self.instruments.read().unwrap();
710 if !instruments.contains_key(&subscription.instrument_id) {
711 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
712 }
713 drop(instruments);
714
715 let coin = subscription
717 .instrument_id
718 .symbol
719 .as_str()
720 .split('-')
721 .next()
722 .context("Invalid instrument symbol")?;
723 let coin = Ustr::from(coin);
724
725 let ws = self.ws_client.clone();
727
728 tokio::spawn(async move {
730 if let Err(e) = ws.subscribe_bbo(coin).await {
731 tracing::error!("Failed to subscribe to quotes: {e:?}");
732 }
733 });
734
735 tracing::info!("Subscribed to quotes for {}", subscription.instrument_id);
736
737 Ok(())
738 }
739
740 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
741 tracing::debug!(
742 "Unsubscribing from quotes: {}",
743 unsubscription.instrument_id
744 );
745
746 let coin = unsubscription
748 .instrument_id
749 .symbol
750 .as_str()
751 .split('-')
752 .next()
753 .context("Invalid instrument symbol")?;
754 let coin = Ustr::from(coin);
755
756 let ws = self.ws_client.clone();
758
759 tokio::spawn(async move {
761 if let Err(e) = ws.unsubscribe_bbo(coin).await {
762 tracing::error!("Failed to unsubscribe from quotes: {e:?}");
763 }
764 });
765
766 tracing::info!(
767 "Unsubscribed from quotes for {}",
768 unsubscription.instrument_id
769 );
770
771 Ok(())
772 }
773
774 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
775 tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
776
777 let instruments = self.instruments.read().unwrap();
779 let instrument_id = subscription.bar_type.instrument_id();
780 if !instruments.contains_key(&instrument_id) {
781 anyhow::bail!("Instrument {} not found", instrument_id);
782 }
783
784 drop(instruments);
785
786 let interval = bar_type_to_interval(&subscription.bar_type)?;
788
789 let coin = instrument_id
791 .symbol
792 .as_str()
793 .split('-')
794 .next()
795 .context("Invalid instrument symbol")?;
796 let coin = Ustr::from(coin);
797
798 let ws = self.ws_client.clone();
800
801 tokio::spawn(async move {
803 if let Err(e) = ws.subscribe_candle(coin, interval).await {
804 tracing::error!("Failed to subscribe to bars: {e:?}");
805 }
806 });
807
808 tracing::info!("Subscribed to bars for {}", subscription.bar_type);
809
810 Ok(())
811 }
812
813 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
814 tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
815
816 let interval = bar_type_to_interval(&unsubscription.bar_type)?;
818
819 let instrument_id = unsubscription.bar_type.instrument_id();
821 let coin = instrument_id
822 .symbol
823 .as_str()
824 .split('-')
825 .next()
826 .context("Invalid instrument symbol")?;
827 let coin = Ustr::from(coin);
828
829 let ws = self.ws_client.clone();
831
832 tokio::spawn(async move {
834 if let Err(e) = ws.unsubscribe_candle(coin, interval).await {
835 tracing::error!("Failed to unsubscribe from bars: {e:?}");
836 }
837 });
838
839 tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
840
841 Ok(())
842 }
843}
844
845fn bar_type_to_interval(bar_type: &BarType) -> anyhow::Result<String> {
847 let spec = bar_type.spec();
848 let step = spec.step.get();
849
850 anyhow::ensure!(
851 bar_type.aggregation_source() == AggregationSource::External,
852 "Only EXTERNAL aggregation is supported"
853 );
854
855 let interval = match spec.aggregation {
856 BarAggregation::Minute => format!("{step}m"),
857 BarAggregation::Hour => format!("{step}h"),
858 BarAggregation::Day => format!("{step}d"),
859 a => anyhow::bail!("Hyperliquid does not support {a:?} aggregation"),
860 };
861
862 Ok(interval)
863}
864
865fn candle_to_bar(
867 candle: &HyperliquidCandle,
868 bar_type: BarType,
869 price_precision: u8,
870 size_precision: u8,
871) -> anyhow::Result<Bar> {
872 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000); let ts_event = ts_init;
874
875 let open = candle.open.parse::<f64>().context("parse open price")?;
876 let high = candle.high.parse::<f64>().context("parse high price")?;
877 let low = candle.low.parse::<f64>().context("parse low price")?;
878 let close = candle.close.parse::<f64>().context("parse close price")?;
879 let volume = candle.volume.parse::<f64>().context("parse volume")?;
880
881 Ok(Bar::new(
882 bar_type,
883 Price::new(open, price_precision),
884 Price::new(high, price_precision),
885 Price::new(low, price_precision),
886 Price::new(close, price_precision),
887 Quantity::new(volume, size_precision),
888 ts_event,
889 ts_init,
890 ))
891}
892
893async fn request_bars_from_http(
895 http_client: HyperliquidHttpClient,
896 bar_type: BarType,
897 start: Option<DateTime<Utc>>,
898 end: Option<DateTime<Utc>>,
899 limit: Option<u32>,
900 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
901) -> anyhow::Result<Vec<Bar>> {
902 let instrument_id = bar_type.instrument_id();
904 let instrument = {
905 let guard = instruments.read().unwrap();
906 guard
907 .get(&instrument_id)
908 .cloned()
909 .context("Instrument not found in cache")?
910 };
911
912 let price_precision = instrument.price_precision();
913 let size_precision = instrument.size_precision();
914
915 let coin = instrument_id
917 .symbol
918 .as_str()
919 .split('-')
920 .next()
921 .context("Invalid instrument symbol")?;
922
923 let interval = bar_type_to_interval(&bar_type)?;
925
926 let now = Utc::now();
928 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
929 let start_time = if let Some(start) = start {
930 start.timestamp_millis() as u64
931 } else {
932 let spec = bar_type.spec();
934 let step_ms = match spec.aggregation {
935 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
936 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
937 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
938 _ => 60_000, };
940 end_time.saturating_sub(1000 * step_ms)
941 };
942
943 let response = http_client
945 .info_candle_snapshot(coin, &interval, start_time, end_time)
946 .await
947 .context("Failed to fetch candle snapshot from Hyperliquid")?;
948
949 let mut bars: Vec<Bar> = response
951 .data
952 .iter()
953 .filter_map(|candle| {
954 candle_to_bar(candle, bar_type, price_precision, size_precision)
955 .map_err(|e| {
956 tracing::warn!("Failed to convert candle to bar: {e}");
957 e
958 })
959 .ok()
960 })
961 .collect();
962
963 if let Some(limit) = limit
965 && bars.len() > limit as usize
966 {
967 bars = bars.into_iter().take(limit as usize).collect();
968 }
969
970 tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
971 Ok(bars)
972}