1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 messages::{
26 DataEvent,
27 data::{
28 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32 UnsubscribeQuotes, UnsubscribeTrades,
33 },
34 },
35 runner::get_data_event_sender,
36};
37use nautilus_core::{
38 UnixNanos,
39 time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43 data::{Bar, BarType, Data},
44 enums::{AggregationSource, BarAggregation},
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::consts::{HYPERLIQUID_TESTNET_WS_URL, HYPERLIQUID_VENUE, HYPERLIQUID_WS_URL},
55 config::HyperliquidDataClientConfig,
56 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57 websocket::client::HyperliquidWebSocketClient,
58};
59
60#[derive(Debug)]
61pub struct HyperliquidDataClient {
62 client_id: ClientId,
63 #[allow(dead_code)]
64 config: HyperliquidDataClientConfig,
65 http_client: HyperliquidHttpClient,
66 ws_client: HyperliquidWebSocketClient,
67 is_connected: AtomicBool,
68 cancellation_token: CancellationToken,
69 tasks: Vec<JoinHandle<()>>,
70 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
71 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72 clock: &'static AtomicTime,
73 #[allow(dead_code)]
74 instrument_refresh_active: bool,
75}
76
77impl HyperliquidDataClient {
78 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
84 let clock = get_atomic_clock_realtime();
85 let data_sender = get_data_event_sender();
86
87 let http_client = if let Some(private_key_str) = &config.private_key {
88 let secrets = crate::common::credential::Secrets {
89 private_key: crate::common::credential::EvmPrivateKey::new(
90 private_key_str.clone(),
91 )?,
92 is_testnet: config.is_testnet,
93 vault_address: None,
94 };
95 HyperliquidHttpClient::with_credentials(&secrets, config.http_timeout_secs)
96 } else {
97 HyperliquidHttpClient::new(config.is_testnet, config.http_timeout_secs)
98 };
99
100 let ws_url = if config.is_testnet {
101 HYPERLIQUID_TESTNET_WS_URL
102 } else {
103 HYPERLIQUID_WS_URL
104 };
105 let ws_client = HyperliquidWebSocketClient::new(ws_url.to_string());
106
107 Ok(Self {
108 client_id,
109 config,
110 http_client,
111 ws_client,
112 is_connected: AtomicBool::new(false),
113 cancellation_token: CancellationToken::new(),
114 tasks: Vec::new(),
115 data_sender,
116 instruments: Arc::new(RwLock::new(AHashMap::new())),
117 clock,
118 instrument_refresh_active: false,
119 })
120 }
121
122 fn venue(&self) -> Venue {
123 *HYPERLIQUID_VENUE
124 }
125
126 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
127 let instruments = self
128 .http_client
129 .request_instruments()
130 .await
131 .context("Failed to fetch instruments during bootstrap")?;
132
133 let mut instruments_map = self.instruments.write().unwrap();
134 for instrument in &instruments {
135 instruments_map.insert(instrument.id(), instrument.clone());
136 }
137
138 tracing::info!("Bootstrapped {} instruments", instruments_map.len());
139 Ok(instruments)
140 }
141
142 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
143 self.ws_client
144 .ensure_connected()
145 .await
146 .context("Failed to connect to Hyperliquid WebSocket")?;
147
148 Ok(())
149 }
150
151 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
152 let instruments = self.instruments.read().unwrap();
153 instruments
154 .get(instrument_id)
155 .cloned()
156 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
157 }
158}
159
160fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
161 value
162 .and_then(|dt| dt.timestamp_nanos_opt())
163 .and_then(|nanos| u64::try_from(nanos).ok())
164 .map(UnixNanos::from)
165}
166
167impl HyperliquidDataClient {
168 #[allow(dead_code)]
169 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
170 if let Err(err) = sender.send(DataEvent::Data(data)) {
171 tracing::error!("Failed to emit data event: {err}");
172 }
173 }
174}
175
176#[async_trait::async_trait]
177impl DataClient for HyperliquidDataClient {
178 fn client_id(&self) -> ClientId {
179 self.client_id
180 }
181
182 fn venue(&self) -> Option<Venue> {
183 Some(self.venue())
184 }
185
186 fn start(&mut self) -> anyhow::Result<()> {
187 tracing::info!("Starting Hyperliquid data client {}", self.client_id);
188 Ok(())
189 }
190
191 fn stop(&mut self) -> anyhow::Result<()> {
192 tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
193 self.cancellation_token.cancel();
194 self.is_connected.store(false, Ordering::Relaxed);
195 Ok(())
196 }
197
198 fn reset(&mut self) -> anyhow::Result<()> {
199 tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
200 self.is_connected.store(false, Ordering::Relaxed);
201 self.cancellation_token = CancellationToken::new();
202 self.tasks.clear();
203 Ok(())
204 }
205
206 fn dispose(&mut self) -> anyhow::Result<()> {
207 tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
208 self.stop()
209 }
210
211 fn is_connected(&self) -> bool {
212 self.is_connected.load(Ordering::Acquire)
213 }
214
215 fn is_disconnected(&self) -> bool {
216 !self.is_connected()
217 }
218
219 async fn connect(&mut self) -> anyhow::Result<()> {
220 if self.is_connected() {
221 return Ok(());
222 }
223
224 let _instruments = self
226 .bootstrap_instruments()
227 .await
228 .context("Failed to bootstrap instruments")?;
229
230 self.spawn_ws()
232 .await
233 .context("Failed to spawn WebSocket client")?;
234
235 self.is_connected.store(true, Ordering::Relaxed);
236 tracing::info!("Hyperliquid data client connected");
237
238 Ok(())
239 }
240
241 async fn disconnect(&mut self) -> anyhow::Result<()> {
242 if !self.is_connected() {
243 return Ok(());
244 }
245
246 self.cancellation_token.cancel();
248
249 for task in self.tasks.drain(..) {
251 if let Err(e) = task.await {
252 tracing::error!("Error waiting for task to complete: {e}");
253 }
254 }
255
256 if let Err(e) = self.ws_client.disconnect().await {
258 tracing::error!("Error disconnecting WebSocket client: {e}");
259 }
260
261 {
263 let mut instruments = self.instruments.write().unwrap();
264 instruments.clear();
265 }
266
267 self.is_connected.store(false, Ordering::Relaxed);
268 tracing::info!("Hyperliquid data client disconnected");
269
270 Ok(())
271 }
272
273 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
274 tracing::debug!("Requesting all instruments");
275
276 let instruments = {
277 let instruments_map = self.instruments.read().unwrap();
278 instruments_map.values().cloned().collect()
279 };
280
281 let response = DataResponse::Instruments(InstrumentsResponse::new(
282 request.request_id,
283 request.client_id.unwrap_or(self.client_id),
284 self.venue(),
285 instruments,
286 datetime_to_unix_nanos(request.start),
287 datetime_to_unix_nanos(request.end),
288 self.clock.get_time_ns(),
289 request.params.clone(),
290 ));
291
292 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
293 tracing::error!("Failed to send instruments response: {err}");
294 }
295
296 Ok(())
297 }
298
299 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
300 tracing::debug!("Requesting instrument: {}", request.instrument_id);
301
302 let instrument = self.get_instrument(&request.instrument_id)?;
303
304 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
305 request.request_id,
306 request.client_id.unwrap_or(self.client_id),
307 instrument.id(),
308 instrument,
309 datetime_to_unix_nanos(request.start),
310 datetime_to_unix_nanos(request.end),
311 self.clock.get_time_ns(),
312 request.params.clone(),
313 )));
314
315 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
316 tracing::error!("Failed to send instrument response: {err}");
317 }
318
319 Ok(())
320 }
321
322 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
323 tracing::debug!("Requesting bars for {}", request.bar_type);
324
325 let http = self.http_client.clone();
326 let sender = self.data_sender.clone();
327 let bar_type = request.bar_type;
328 let start = request.start;
329 let end = request.end;
330 let limit = request.limit.map(|n| n.get() as u32);
331 let request_id = request.request_id;
332 let client_id = request.client_id.unwrap_or(self.client_id);
333 let params = request.params.clone();
334 let clock = self.clock;
335 let start_nanos = datetime_to_unix_nanos(start);
336 let end_nanos = datetime_to_unix_nanos(end);
337 let instruments = Arc::clone(&self.instruments);
338
339 tokio::spawn(async move {
340 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
341 Ok(bars) => {
342 let response = DataResponse::Bars(BarsResponse::new(
343 request_id,
344 client_id,
345 bar_type,
346 bars,
347 start_nanos,
348 end_nanos,
349 clock.get_time_ns(),
350 params,
351 ));
352 if let Err(err) = sender.send(DataEvent::Response(response)) {
353 tracing::error!("Failed to send bars response: {err}");
354 }
355 }
356 Err(err) => tracing::error!("Bar request failed: {err:?}"),
357 }
358 });
359
360 Ok(())
361 }
362
363 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
364 tracing::debug!("Requesting trades for {}", request.instrument_id);
365
366 let trades = Vec::new();
367
368 let response = DataResponse::Trades(TradesResponse::new(
369 request.request_id,
370 request.client_id.unwrap_or(self.client_id),
371 request.instrument_id,
372 trades,
373 datetime_to_unix_nanos(request.start),
374 datetime_to_unix_nanos(request.end),
375 self.clock.get_time_ns(),
376 request.params.clone(),
377 ));
378
379 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
380 tracing::error!("Failed to send trades response: {err}");
381 }
382
383 Ok(())
384 }
385
386 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
387 tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
388
389 let instruments = self.instruments.read().unwrap();
391 if !instruments.contains_key(&subscription.instrument_id) {
392 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
393 }
394
395 let coin = subscription
397 .instrument_id
398 .symbol
399 .as_str()
400 .split('-')
401 .next()
402 .context("Invalid instrument symbol")?;
403 let coin = Ustr::from(coin);
404
405 let ws = self.ws_client.clone();
407
408 tokio::spawn(async move {
410 if let Err(err) = ws.subscribe_trades(coin).await {
411 tracing::error!("Failed to subscribe to trades: {err:?}");
412 }
413 });
414
415 tracing::info!("Subscribed to trades for {}", subscription.instrument_id);
416
417 Ok(())
418 }
419
420 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
421 tracing::debug!(
422 "Unsubscribing from trades: {}",
423 unsubscription.instrument_id
424 );
425
426 let coin = unsubscription
428 .instrument_id
429 .symbol
430 .as_str()
431 .split('-')
432 .next()
433 .context("Invalid instrument symbol")?;
434 let coin = Ustr::from(coin);
435
436 let ws = self.ws_client.clone();
438
439 tokio::spawn(async move {
441 if let Err(err) = ws.unsubscribe_trades(coin).await {
442 tracing::error!("Failed to unsubscribe from trades: {err:?}");
443 }
444 });
445
446 tracing::info!(
447 "Unsubscribed from trades for {}",
448 unsubscription.instrument_id
449 );
450
451 Ok(())
452 }
453
454 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
455 tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
456
457 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
459 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
460 }
461
462 let instruments = self.instruments.read().unwrap();
464 if !instruments.contains_key(&subscription.instrument_id) {
465 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
466 }
467 drop(instruments);
468
469 let coin = subscription
471 .instrument_id
472 .symbol
473 .as_str()
474 .split('-')
475 .next()
476 .context("Invalid instrument symbol")?;
477 let coin = Ustr::from(coin);
478
479 let ws = self.ws_client.clone();
481
482 tokio::spawn(async move {
484 if let Err(err) = ws.subscribe_book(coin).await {
485 tracing::error!("Failed to subscribe to book deltas: {err:?}");
486 }
487 });
488
489 tracing::info!(
490 "Subscribed to book deltas for {}",
491 subscription.instrument_id
492 );
493
494 Ok(())
495 }
496
497 fn unsubscribe_book_deltas(
498 &mut self,
499 unsubscription: &UnsubscribeBookDeltas,
500 ) -> anyhow::Result<()> {
501 tracing::debug!(
502 "Unsubscribing from book deltas: {}",
503 unsubscription.instrument_id
504 );
505
506 let coin = unsubscription
508 .instrument_id
509 .symbol
510 .as_str()
511 .split('-')
512 .next()
513 .context("Invalid instrument symbol")?;
514 let coin = Ustr::from(coin);
515
516 let ws = self.ws_client.clone();
518
519 tokio::spawn(async move {
521 if let Err(err) = ws.unsubscribe_book(coin).await {
522 tracing::error!("Failed to unsubscribe from book deltas: {err:?}");
523 }
524 });
525
526 tracing::info!(
527 "Unsubscribed from book deltas for {}",
528 unsubscription.instrument_id
529 );
530
531 Ok(())
532 }
533
534 fn subscribe_book_snapshots(
535 &mut self,
536 subscription: &SubscribeBookSnapshots,
537 ) -> anyhow::Result<()> {
538 tracing::debug!(
539 "Subscribing to book snapshots: {}",
540 subscription.instrument_id
541 );
542
543 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
545 anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
546 }
547
548 let instruments = self.instruments.read().unwrap();
550 if !instruments.contains_key(&subscription.instrument_id) {
551 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
552 }
553 drop(instruments);
554
555 let coin = subscription
557 .instrument_id
558 .symbol
559 .as_str()
560 .split('-')
561 .next()
562 .context("Invalid instrument symbol")?;
563 let coin = Ustr::from(coin);
564
565 let ws = self.ws_client.clone();
567
568 tokio::spawn(async move {
570 if let Err(err) = ws.subscribe_bbo(coin).await {
571 tracing::error!("Failed to subscribe to book snapshots: {err:?}");
572 }
573 });
574
575 tracing::info!(
576 "Subscribed to book snapshots for {}",
577 subscription.instrument_id
578 );
579
580 Ok(())
581 }
582
583 fn unsubscribe_book_snapshots(
584 &mut self,
585 unsubscription: &UnsubscribeBookSnapshots,
586 ) -> anyhow::Result<()> {
587 tracing::debug!(
588 "Unsubscribing from book snapshots: {}",
589 unsubscription.instrument_id
590 );
591
592 let coin = unsubscription
594 .instrument_id
595 .symbol
596 .as_str()
597 .split('-')
598 .next()
599 .context("Invalid instrument symbol")?;
600 let coin = Ustr::from(coin);
601
602 let ws = self.ws_client.clone();
604
605 tokio::spawn(async move {
607 if let Err(err) = ws.unsubscribe_bbo(coin).await {
608 tracing::error!("Failed to unsubscribe from book snapshots: {err:?}");
609 }
610 });
611
612 tracing::info!(
613 "Unsubscribed from book snapshots for {}",
614 unsubscription.instrument_id
615 );
616
617 Ok(())
618 }
619
620 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
621 tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
622
623 let instruments = self.instruments.read().unwrap();
625 if !instruments.contains_key(&subscription.instrument_id) {
626 anyhow::bail!("Instrument {} not found", subscription.instrument_id);
627 }
628 drop(instruments);
629
630 let coin = subscription
632 .instrument_id
633 .symbol
634 .as_str()
635 .split('-')
636 .next()
637 .context("Invalid instrument symbol")?;
638 let coin = Ustr::from(coin);
639
640 let ws = self.ws_client.clone();
642
643 tokio::spawn(async move {
645 if let Err(err) = ws.subscribe_bbo(coin).await {
646 tracing::error!("Failed to subscribe to quotes: {err:?}");
647 }
648 });
649
650 tracing::info!("Subscribed to quotes for {}", subscription.instrument_id);
651
652 Ok(())
653 }
654
655 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
656 tracing::debug!(
657 "Unsubscribing from quotes: {}",
658 unsubscription.instrument_id
659 );
660
661 let coin = unsubscription
663 .instrument_id
664 .symbol
665 .as_str()
666 .split('-')
667 .next()
668 .context("Invalid instrument symbol")?;
669 let coin = Ustr::from(coin);
670
671 let ws = self.ws_client.clone();
673
674 tokio::spawn(async move {
676 if let Err(err) = ws.unsubscribe_bbo(coin).await {
677 tracing::error!("Failed to unsubscribe from quotes: {err:?}");
678 }
679 });
680
681 tracing::info!(
682 "Unsubscribed from quotes for {}",
683 unsubscription.instrument_id
684 );
685
686 Ok(())
687 }
688
689 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
690 tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
691
692 let instruments = self.instruments.read().unwrap();
694 let instrument_id = subscription.bar_type.instrument_id();
695 if !instruments.contains_key(&instrument_id) {
696 anyhow::bail!("Instrument {} not found", instrument_id);
697 }
698
699 drop(instruments);
700
701 let interval = bar_type_to_interval(&subscription.bar_type)?;
703
704 let coin = instrument_id
706 .symbol
707 .as_str()
708 .split('-')
709 .next()
710 .context("Invalid instrument symbol")?;
711 let coin = Ustr::from(coin);
712
713 let ws = self.ws_client.clone();
715
716 tokio::spawn(async move {
718 if let Err(err) = ws.subscribe_candle(coin, interval).await {
719 tracing::error!("Failed to subscribe to bars: {err:?}");
720 }
721 });
722
723 tracing::info!("Subscribed to bars for {}", subscription.bar_type);
724
725 Ok(())
726 }
727
728 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
729 tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
730
731 let interval = bar_type_to_interval(&unsubscription.bar_type)?;
733
734 let instrument_id = unsubscription.bar_type.instrument_id();
736 let coin = instrument_id
737 .symbol
738 .as_str()
739 .split('-')
740 .next()
741 .context("Invalid instrument symbol")?;
742 let coin = Ustr::from(coin);
743
744 let ws = self.ws_client.clone();
746
747 tokio::spawn(async move {
749 if let Err(err) = ws.unsubscribe_candle(coin, interval).await {
750 tracing::error!("Failed to unsubscribe from bars: {err:?}");
751 }
752 });
753
754 tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
755
756 Ok(())
757 }
758}
759
760fn bar_type_to_interval(bar_type: &BarType) -> anyhow::Result<String> {
762 let spec = bar_type.spec();
763 let step = spec.step.get();
764
765 anyhow::ensure!(
766 bar_type.aggregation_source() == AggregationSource::External,
767 "Only EXTERNAL aggregation is supported"
768 );
769
770 let interval = match spec.aggregation {
771 BarAggregation::Minute => format!("{step}m"),
772 BarAggregation::Hour => format!("{step}h"),
773 BarAggregation::Day => format!("{step}d"),
774 a => anyhow::bail!("Hyperliquid does not support {a:?} aggregation"),
775 };
776
777 Ok(interval)
778}
779
780fn candle_to_bar(
782 candle: &HyperliquidCandle,
783 bar_type: BarType,
784 price_precision: u8,
785 size_precision: u8,
786) -> anyhow::Result<Bar> {
787 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000); let ts_event = ts_init;
789
790 let open = candle.open.parse::<f64>().context("parse open price")?;
791 let high = candle.high.parse::<f64>().context("parse high price")?;
792 let low = candle.low.parse::<f64>().context("parse low price")?;
793 let close = candle.close.parse::<f64>().context("parse close price")?;
794 let volume = candle.volume.parse::<f64>().context("parse volume")?;
795
796 Ok(Bar::new(
797 bar_type,
798 Price::new(open, price_precision),
799 Price::new(high, price_precision),
800 Price::new(low, price_precision),
801 Price::new(close, price_precision),
802 Quantity::new(volume, size_precision),
803 ts_event,
804 ts_init,
805 ))
806}
807
808async fn request_bars_from_http(
810 http_client: HyperliquidHttpClient,
811 bar_type: BarType,
812 start: Option<DateTime<Utc>>,
813 end: Option<DateTime<Utc>>,
814 limit: Option<u32>,
815 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
816) -> anyhow::Result<Vec<Bar>> {
817 let instrument_id = bar_type.instrument_id();
819 let instrument = {
820 let guard = instruments.read().unwrap();
821 guard
822 .get(&instrument_id)
823 .cloned()
824 .context("Instrument not found in cache")?
825 };
826
827 let price_precision = instrument.price_precision();
828 let size_precision = instrument.size_precision();
829
830 let coin = instrument_id
832 .symbol
833 .as_str()
834 .split('-')
835 .next()
836 .context("Invalid instrument symbol")?;
837
838 let interval = bar_type_to_interval(&bar_type)?;
840
841 let now = Utc::now();
843 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
844 let start_time = if let Some(start) = start {
845 start.timestamp_millis() as u64
846 } else {
847 let spec = bar_type.spec();
849 let step_ms = match spec.aggregation {
850 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
851 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
852 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
853 _ => 60_000, };
855 end_time.saturating_sub(1000 * step_ms)
856 };
857
858 let response = http_client
860 .info_candle_snapshot(coin, &interval, start_time, end_time)
861 .await
862 .context("Failed to fetch candle snapshot from Hyperliquid")?;
863
864 let mut bars: Vec<Bar> = response
866 .data
867 .iter()
868 .filter_map(|candle| {
869 candle_to_bar(candle, bar_type, price_precision, size_precision)
870 .map_err(|e| {
871 tracing::warn!("Failed to convert candle to bar: {e}");
872 e
873 })
874 .ok()
875 })
876 .collect();
877
878 if let Some(limit) = limit
880 && bars.len() > limit as usize
881 {
882 bars = bars.into_iter().take(limit as usize).collect();
883 }
884
885 tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
886 Ok(bars)
887}