1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 clients::DataClient,
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent,
29 data::{
30 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
31 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
32 SubscribeBookDeltas, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
33 UnsubscribeBookDeltas, UnsubscribeQuotes, UnsubscribeTrades,
34 },
35 },
36};
37use nautilus_core::{
38 UnixNanos,
39 datetime::datetime_to_unix_nanos,
40 time::{AtomicTime, get_atomic_clock_realtime},
41};
42use nautilus_model::{
43 data::{Bar, BarType, Data, OrderBookDeltas_API},
44 enums::{BarAggregation, BookType},
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
55 config::HyperliquidDataClientConfig,
56 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57 websocket::{
58 client::HyperliquidWebSocketClient,
59 messages::{HyperliquidWsMessage, NautilusWsMessage},
60 parse::{
61 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
62 },
63 },
64};
65
66#[derive(Debug)]
67pub struct HyperliquidDataClient {
68 client_id: ClientId,
69 #[allow(dead_code)]
70 config: HyperliquidDataClientConfig,
71 http_client: HyperliquidHttpClient,
72 ws_client: HyperliquidWebSocketClient,
73 is_connected: AtomicBool,
74 cancellation_token: CancellationToken,
75 tasks: Vec<JoinHandle<()>>,
76 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78 coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
81 clock: &'static AtomicTime,
82 #[allow(dead_code)]
83 instrument_refresh_active: bool,
84}
85
86impl HyperliquidDataClient {
87 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
93 let clock = get_atomic_clock_realtime();
94 let data_sender = get_data_event_sender();
95
96 let http_client = if let Some(private_key_str) = &config.private_key {
97 let secrets = crate::common::credential::Secrets {
98 private_key: crate::common::credential::EvmPrivateKey::new(
99 private_key_str.clone(),
100 )?,
101 is_testnet: config.is_testnet,
102 vault_address: None,
103 };
104 HyperliquidHttpClient::with_credentials(
105 &secrets,
106 config.http_timeout_secs,
107 config.http_proxy_url.clone(),
108 )?
109 } else {
110 HyperliquidHttpClient::new(
111 config.is_testnet,
112 config.http_timeout_secs,
113 config.http_proxy_url.clone(),
114 )?
115 };
116
117 let ws_client = HyperliquidWebSocketClient::new(
120 None,
121 config.is_testnet,
122 HyperliquidProductType::Perp,
123 None,
124 );
125
126 Ok(Self {
127 client_id,
128 config,
129 http_client,
130 ws_client,
131 is_connected: AtomicBool::new(false),
132 cancellation_token: CancellationToken::new(),
133 tasks: Vec::new(),
134 data_sender,
135 instruments: Arc::new(RwLock::new(AHashMap::new())),
136 coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
137 clock,
138 instrument_refresh_active: false,
139 })
140 }
141
142 fn venue(&self) -> Venue {
143 *HYPERLIQUID_VENUE
144 }
145
146 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
147 let instruments = self
148 .http_client
149 .request_instruments()
150 .await
151 .context("failed to fetch instruments during bootstrap")?;
152
153 let mut instruments_map = self.instruments.write().unwrap();
154 let mut coin_map = self.coin_to_instrument_id.write().unwrap();
155
156 for instrument in &instruments {
157 let instrument_id = instrument.id();
158 instruments_map.insert(instrument_id, instrument.clone());
159
160 let coin = instrument.raw_symbol().inner();
163 if instrument_id.symbol.as_str().starts_with("BTCUSD") {
164 log::warn!(
165 "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
166 instrument_id,
167 instrument.raw_symbol(),
168 coin
169 );
170 }
171 coin_map.insert(coin, instrument_id);
172
173 self.ws_client.cache_instrument(instrument.clone());
174 }
175
176 log::info!(
177 "Bootstrapped {} instruments with {} coin mappings",
178 instruments_map.len(),
179 coin_map.len()
180 );
181 Ok(instruments)
182 }
183
184 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
185 let mut ws_client = self.ws_client.clone();
187
188 ws_client
189 .connect()
190 .await
191 .context("failed to connect to Hyperliquid WebSocket")?;
192
193 let _data_sender = self.data_sender.clone();
194 let _instruments = Arc::clone(&self.instruments);
195 let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
196 let _venue = self.venue();
197 let _clock = self.clock;
198 let cancellation_token = self.cancellation_token.clone();
199
200 let task = get_runtime().spawn(async move {
201 log::info!("Hyperliquid WebSocket consumption loop started");
202
203 loop {
204 tokio::select! {
205 () = cancellation_token.cancelled() => {
206 log::info!("WebSocket consumption loop cancelled");
207 break;
208 }
209 msg_opt = ws_client.next_event() => {
210 if let Some(msg) = msg_opt {
211 match msg {
212 NautilusWsMessage::Trades(_)
214 | NautilusWsMessage::Quote(_)
215 | NautilusWsMessage::Deltas(_)
216 | NautilusWsMessage::Candle(_)
217 | NautilusWsMessage::MarkPrice(_)
218 | NautilusWsMessage::IndexPrice(_)
219 | NautilusWsMessage::FundingRate(_) => {}
220 NautilusWsMessage::Reconnected => {
221 log::info!("WebSocket reconnected");
222 }
223 NautilusWsMessage::Error(e) => {
224 log::error!("WebSocket error: {e}");
225 }
226 NautilusWsMessage::ExecutionReports(_) => {
227 }
229 }
230 } else {
231 log::warn!("WebSocket next_event returned None, connection may be closed");
233 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
234 }
235 }
236 }
237 }
238
239 log::info!("Hyperliquid WebSocket consumption loop finished");
240 });
241
242 self.tasks.push(task);
243 log::info!("WebSocket consumption task spawned");
244
245 Ok(())
246 }
247
248 #[allow(dead_code)]
249 fn handle_ws_message(
250 msg: HyperliquidWsMessage,
251 ws_client: &HyperliquidWebSocketClient,
252 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
253 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
254 coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
255 _venue: Venue,
256 clock: &'static AtomicTime,
257 ) {
258 match msg {
259 HyperliquidWsMessage::Bbo { data } => {
260 let coin = data.coin;
261 log::debug!("Received BBO message for coin: {coin}");
262
263 let coin_map = coin_to_instrument_id.read().unwrap();
266 let instrument_id = coin_map.get(&data.coin);
267
268 if let Some(&instrument_id) = instrument_id {
269 let instruments_map = instruments.read().unwrap();
270 if let Some(instrument) = instruments_map.get(&instrument_id) {
271 let ts_init = clock.get_time_ns();
272
273 match parse_ws_quote_tick(&data, instrument, ts_init) {
274 Ok(quote_tick) => {
275 log::debug!(
276 "Parsed quote tick for {}: bid={}, ask={}",
277 data.coin,
278 quote_tick.bid_price,
279 quote_tick.ask_price
280 );
281 if let Err(e) =
282 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
283 {
284 log::error!("Failed to send quote tick: {e}");
285 }
286 }
287 Err(e) => {
288 log::error!("Failed to parse quote tick for {}: {e}", data.coin);
289 }
290 }
291 }
292 } else {
293 log::warn!(
294 "Received BBO for unknown coin: {} (no matching instrument found)",
295 data.coin
296 );
297 }
298 }
299 HyperliquidWsMessage::Trades { data } => {
300 let count = data.len();
301 log::debug!("Received {count} trade(s)");
302
303 for trade_data in data {
305 let coin = trade_data.coin;
306 let coin_map = coin_to_instrument_id.read().unwrap();
307
308 if let Some(&instrument_id) = coin_map.get(&coin) {
309 let instruments_map = instruments.read().unwrap();
310 if let Some(instrument) = instruments_map.get(&instrument_id) {
311 let ts_init = clock.get_time_ns();
312
313 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
314 Ok(trade_tick) => {
315 if let Err(e) =
316 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
317 {
318 log::error!("Failed to send trade tick: {e}");
319 }
320 }
321 Err(e) => {
322 log::error!("Failed to parse trade tick for {coin}: {e}");
323 }
324 }
325 }
326 } else {
327 log::warn!("Received trade for unknown coin: {coin}");
328 }
329 }
330 }
331 HyperliquidWsMessage::L2Book { data } => {
332 let coin = data.coin;
333 log::debug!("Received L2 book update for coin: {coin}");
334
335 let coin_map = coin_to_instrument_id.read().unwrap();
336 if let Some(&instrument_id) = coin_map.get(&data.coin) {
337 let instruments_map = instruments.read().unwrap();
338 if let Some(instrument) = instruments_map.get(&instrument_id) {
339 let ts_init = clock.get_time_ns();
340
341 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
342 Ok(deltas) => {
343 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
344 OrderBookDeltas_API::new(deltas),
345 ))) {
346 log::error!("Failed to send order book deltas: {e}");
347 }
348 }
349 Err(e) => {
350 log::error!(
351 "Failed to parse order book deltas for {}: {e}",
352 data.coin
353 );
354 }
355 }
356 }
357 } else {
358 log::warn!("Received L2 book for unknown coin: {coin}");
359 }
360 }
361 HyperliquidWsMessage::Candle { data } => {
362 let coin = &data.s;
363 let interval = &data.i;
364 log::debug!("Received candle for {coin}:{interval}");
365
366 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
367 let coin = Ustr::from(&data.s);
368 let coin_map = coin_to_instrument_id.read().unwrap();
369
370 if let Some(&instrument_id) = coin_map.get(&coin) {
371 let instruments_map = instruments.read().unwrap();
372 if let Some(instrument) = instruments_map.get(&instrument_id) {
373 let ts_init = clock.get_time_ns();
374
375 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
376 Ok(bar) => {
377 if let Err(e) =
378 data_sender.send(DataEvent::Data(Data::Bar(bar)))
379 {
380 log::error!("Failed to send bar data: {e}");
381 }
382 }
383 Err(e) => {
384 log::error!("Failed to parse candle for {coin}: {e}");
385 }
386 }
387 }
388 } else {
389 log::warn!("Received candle for unknown coin: {coin}");
390 }
391 } else {
392 log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
393 }
394 }
395 _ => {
396 log::trace!("Received unhandled WebSocket message: {msg:?}");
398 }
399 }
400 }
401
402 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
403 let instruments = self.instruments.read().unwrap();
404 instruments
405 .get(instrument_id)
406 .cloned()
407 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
408 }
409}
410
411impl HyperliquidDataClient {
412 #[allow(dead_code)]
413 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
414 if let Err(e) = sender.send(DataEvent::Data(data)) {
415 log::error!("Failed to emit data event: {e}");
416 }
417 }
418}
419
420#[async_trait::async_trait(?Send)]
421impl DataClient for HyperliquidDataClient {
422 fn client_id(&self) -> ClientId {
423 self.client_id
424 }
425
426 fn venue(&self) -> Option<Venue> {
427 Some(self.venue())
428 }
429
430 fn start(&mut self) -> anyhow::Result<()> {
431 log::info!(
432 "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
433 self.client_id,
434 self.config.is_testnet,
435 self.config.http_proxy_url,
436 self.config.ws_proxy_url,
437 );
438 Ok(())
439 }
440
441 fn stop(&mut self) -> anyhow::Result<()> {
442 log::info!("Stopping Hyperliquid data client {}", self.client_id);
443 self.cancellation_token.cancel();
444 self.is_connected.store(false, Ordering::Relaxed);
445 Ok(())
446 }
447
448 fn reset(&mut self) -> anyhow::Result<()> {
449 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
450 self.is_connected.store(false, Ordering::Relaxed);
451 self.cancellation_token = CancellationToken::new();
452 self.tasks.clear();
453 Ok(())
454 }
455
456 fn dispose(&mut self) -> anyhow::Result<()> {
457 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
458 self.stop()
459 }
460
461 fn is_connected(&self) -> bool {
462 self.is_connected.load(Ordering::Acquire)
463 }
464
465 fn is_disconnected(&self) -> bool {
466 !self.is_connected()
467 }
468
469 async fn connect(&mut self) -> anyhow::Result<()> {
470 if self.is_connected() {
471 return Ok(());
472 }
473
474 let _instruments = self
476 .bootstrap_instruments()
477 .await
478 .context("failed to bootstrap instruments")?;
479
480 self.spawn_ws()
482 .await
483 .context("failed to spawn WebSocket client")?;
484
485 self.is_connected.store(true, Ordering::Relaxed);
486 log::info!("Connected: client_id={}", self.client_id);
487
488 Ok(())
489 }
490
491 async fn disconnect(&mut self) -> anyhow::Result<()> {
492 if !self.is_connected() {
493 return Ok(());
494 }
495
496 self.cancellation_token.cancel();
498
499 for task in self.tasks.drain(..) {
501 if let Err(e) = task.await {
502 log::error!("Error waiting for task to complete: {e}");
503 }
504 }
505
506 if let Err(e) = self.ws_client.disconnect().await {
508 log::error!("Error disconnecting WebSocket client: {e}");
509 }
510
511 {
513 let mut instruments = self.instruments.write().unwrap();
514 instruments.clear();
515 }
516
517 self.is_connected.store(false, Ordering::Relaxed);
518 log::info!("Disconnected: client_id={}", self.client_id);
519
520 Ok(())
521 }
522
523 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
524 log::debug!("Requesting all instruments");
525
526 let instruments = {
527 let instruments_map = self.instruments.read().unwrap();
528 instruments_map.values().cloned().collect()
529 };
530
531 let response = DataResponse::Instruments(InstrumentsResponse::new(
532 request.request_id,
533 request.client_id.unwrap_or(self.client_id),
534 self.venue(),
535 instruments,
536 datetime_to_unix_nanos(request.start),
537 datetime_to_unix_nanos(request.end),
538 self.clock.get_time_ns(),
539 request.params.clone(),
540 ));
541
542 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
543 log::error!("Failed to send instruments response: {e}");
544 }
545
546 Ok(())
547 }
548
549 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
550 log::debug!("Requesting instrument: {}", request.instrument_id);
551
552 let instrument = self.get_instrument(&request.instrument_id)?;
553
554 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
555 request.request_id,
556 request.client_id.unwrap_or(self.client_id),
557 instrument.id(),
558 instrument,
559 datetime_to_unix_nanos(request.start),
560 datetime_to_unix_nanos(request.end),
561 self.clock.get_time_ns(),
562 request.params.clone(),
563 )));
564
565 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
566 log::error!("Failed to send instrument response: {e}");
567 }
568
569 Ok(())
570 }
571
572 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
573 log::debug!("Requesting bars for {}", request.bar_type);
574
575 let http = self.http_client.clone();
576 let sender = self.data_sender.clone();
577 let bar_type = request.bar_type;
578 let start = request.start;
579 let end = request.end;
580 let limit = request.limit.map(|n| n.get() as u32);
581 let request_id = request.request_id;
582 let client_id = request.client_id.unwrap_or(self.client_id);
583 let params = request.params.clone();
584 let clock = self.clock;
585 let start_nanos = datetime_to_unix_nanos(start);
586 let end_nanos = datetime_to_unix_nanos(end);
587 let instruments = Arc::clone(&self.instruments);
588
589 get_runtime().spawn(async move {
590 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
591 Ok(bars) => {
592 let response = DataResponse::Bars(BarsResponse::new(
593 request_id,
594 client_id,
595 bar_type,
596 bars,
597 start_nanos,
598 end_nanos,
599 clock.get_time_ns(),
600 params,
601 ));
602 if let Err(e) = sender.send(DataEvent::Response(response)) {
603 log::error!("Failed to send bars response: {e}");
604 }
605 }
606 Err(e) => log::error!("Bar request failed: {e:?}"),
607 }
608 });
609
610 Ok(())
611 }
612
613 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
614 log::debug!("Requesting trades for {}", request.instrument_id);
615
616 log::warn!(
621 "Historical trade data not available via REST on Hyperliquid for {}",
622 request.instrument_id
623 );
624
625 let trades = Vec::new();
626
627 let response = DataResponse::Trades(TradesResponse::new(
628 request.request_id,
629 request.client_id.unwrap_or(self.client_id),
630 request.instrument_id,
631 trades,
632 datetime_to_unix_nanos(request.start),
633 datetime_to_unix_nanos(request.end),
634 self.clock.get_time_ns(),
635 request.params.clone(),
636 ));
637
638 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
639 log::error!("Failed to send trades response: {e}");
640 }
641
642 Ok(())
643 }
644
645 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
646 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
647
648 let ws = self.ws_client.clone();
649 let instrument_id = subscription.instrument_id;
650
651 get_runtime().spawn(async move {
652 if let Err(e) = ws.subscribe_trades(instrument_id).await {
653 log::error!("Failed to subscribe to trades: {e:?}");
654 }
655 });
656
657 Ok(())
658 }
659
660 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
661 log::debug!(
662 "Unsubscribing from trades: {}",
663 unsubscription.instrument_id
664 );
665
666 let ws = self.ws_client.clone();
667 let instrument_id = unsubscription.instrument_id;
668
669 get_runtime().spawn(async move {
670 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
671 log::error!("Failed to unsubscribe from trades: {e:?}");
672 }
673 });
674
675 log::info!(
676 "Unsubscribed from trades for {}",
677 unsubscription.instrument_id
678 );
679
680 Ok(())
681 }
682
683 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
684 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
685
686 if subscription.book_type != BookType::L2_MBP {
687 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
688 }
689
690 let ws = self.ws_client.clone();
691 let instrument_id = subscription.instrument_id;
692
693 get_runtime().spawn(async move {
694 if let Err(e) = ws.subscribe_book(instrument_id).await {
695 log::error!("Failed to subscribe to book deltas: {e:?}");
696 }
697 });
698
699 Ok(())
700 }
701
702 fn unsubscribe_book_deltas(
703 &mut self,
704 unsubscription: &UnsubscribeBookDeltas,
705 ) -> anyhow::Result<()> {
706 log::debug!(
707 "Unsubscribing from book deltas: {}",
708 unsubscription.instrument_id
709 );
710
711 let ws = self.ws_client.clone();
712 let instrument_id = unsubscription.instrument_id;
713
714 get_runtime().spawn(async move {
715 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
716 log::error!("Failed to unsubscribe from book deltas: {e:?}");
717 }
718 });
719
720 Ok(())
721 }
722
723 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
724 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
725
726 let ws = self.ws_client.clone();
727 let instrument_id = subscription.instrument_id;
728
729 get_runtime().spawn(async move {
730 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
731 log::error!("Failed to subscribe to quotes: {e:?}");
732 }
733 });
734
735 Ok(())
736 }
737
738 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
739 log::debug!(
740 "Unsubscribing from quotes: {}",
741 unsubscription.instrument_id
742 );
743
744 let ws = self.ws_client.clone();
745 let instrument_id = unsubscription.instrument_id;
746
747 get_runtime().spawn(async move {
748 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
749 log::error!("Failed to unsubscribe from quotes: {e:?}");
750 }
751 });
752
753 log::info!(
754 "Unsubscribed from quotes for {}",
755 unsubscription.instrument_id
756 );
757
758 Ok(())
759 }
760
761 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
762 log::debug!("Subscribing to bars: {}", subscription.bar_type);
763
764 let instruments = self.instruments.read().unwrap();
765 let instrument_id = subscription.bar_type.instrument_id();
766 if !instruments.contains_key(&instrument_id) {
767 anyhow::bail!("Instrument {instrument_id} not found");
768 }
769
770 drop(instruments);
771
772 let bar_type = subscription.bar_type;
773 let ws = self.ws_client.clone();
774
775 get_runtime().spawn(async move {
776 if let Err(e) = ws.subscribe_bars(bar_type).await {
777 log::error!("Failed to subscribe to bars: {e:?}");
778 }
779 });
780
781 log::info!("Subscribed to bars for {}", subscription.bar_type);
782
783 Ok(())
784 }
785
786 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
787 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
788
789 let bar_type = unsubscription.bar_type;
790 let ws = self.ws_client.clone();
791
792 get_runtime().spawn(async move {
793 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
794 log::error!("Failed to unsubscribe from bars: {e:?}");
795 }
796 });
797
798 log::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
799
800 Ok(())
801 }
802}
803
804pub(crate) fn candle_to_bar(
805 candle: &HyperliquidCandle,
806 bar_type: BarType,
807 price_precision: u8,
808 size_precision: u8,
809) -> anyhow::Result<Bar> {
810 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
811 let ts_event = ts_init;
812
813 let open = candle.open.parse::<f64>().context("parse open price")?;
814 let high = candle.high.parse::<f64>().context("parse high price")?;
815 let low = candle.low.parse::<f64>().context("parse low price")?;
816 let close = candle.close.parse::<f64>().context("parse close price")?;
817 let volume = candle.volume.parse::<f64>().context("parse volume")?;
818
819 Ok(Bar::new(
820 bar_type,
821 Price::new(open, price_precision),
822 Price::new(high, price_precision),
823 Price::new(low, price_precision),
824 Price::new(close, price_precision),
825 Quantity::new(volume, size_precision),
826 ts_event,
827 ts_init,
828 ))
829}
830
831async fn request_bars_from_http(
833 http_client: HyperliquidHttpClient,
834 bar_type: BarType,
835 start: Option<DateTime<Utc>>,
836 end: Option<DateTime<Utc>>,
837 limit: Option<u32>,
838 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
839) -> anyhow::Result<Vec<Bar>> {
840 let instrument_id = bar_type.instrument_id();
842 let instrument = {
843 let guard = instruments.read().unwrap();
844 guard
845 .get(&instrument_id)
846 .cloned()
847 .context("instrument not found in cache")?
848 };
849
850 let price_precision = instrument.price_precision();
851 let size_precision = instrument.size_precision();
852
853 let coin = instrument_id
855 .symbol
856 .as_str()
857 .split('-')
858 .next()
859 .context("invalid instrument symbol")?;
860
861 let interval = bar_type_to_interval(&bar_type)?;
862
863 let now = Utc::now();
865 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
866 let start_time = if let Some(start) = start {
867 start.timestamp_millis() as u64
868 } else {
869 let spec = bar_type.spec();
871 let step_ms = match spec.aggregation {
872 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
873 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
874 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
875 _ => 60_000,
876 };
877 end_time.saturating_sub(1000 * step_ms)
878 };
879
880 let candles = http_client
881 .info_candle_snapshot(coin, interval, start_time, end_time)
882 .await
883 .context("failed to fetch candle snapshot from Hyperliquid")?;
884
885 let mut bars: Vec<Bar> = candles
886 .iter()
887 .filter_map(|candle| {
888 candle_to_bar(candle, bar_type, price_precision, size_precision)
889 .map_err(|e| {
890 log::warn!("Failed to convert candle to bar: {e}");
891 e
892 })
893 .ok()
894 })
895 .collect();
896
897 if let Some(limit) = limit
898 && bars.len() > limit as usize
899 {
900 bars = bars.into_iter().take(limit as usize).collect();
901 }
902
903 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
904 Ok(bars)
905}