1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 live::{runner::get_data_event_sender, runtime::get_runtime},
26 messages::{
27 DataEvent,
28 data::{
29 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
30 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
31 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
32 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
33 UnsubscribeQuotes, UnsubscribeTrades,
34 },
35 },
36};
37use nautilus_core::{
38 UnixNanos,
39 datetime::datetime_to_unix_nanos,
40 time::{AtomicTime, get_atomic_clock_realtime},
41};
42use nautilus_data::client::DataClient;
43use nautilus_model::{
44 data::{Bar, BarType, Data, OrderBookDeltas_API},
45 enums::{BarAggregation, BookType},
46 identifiers::{ClientId, InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 types::{Price, Quantity},
49};
50use tokio::task::JoinHandle;
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use crate::{
55 common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
56 config::HyperliquidDataClientConfig,
57 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
58 websocket::{
59 client::HyperliquidWebSocketClient,
60 messages::{HyperliquidWsMessage, NautilusWsMessage},
61 parse::{
62 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
63 },
64 },
65};
66
67#[derive(Debug)]
68pub struct HyperliquidDataClient {
69 client_id: ClientId,
70 #[allow(dead_code)]
71 config: HyperliquidDataClientConfig,
72 http_client: HyperliquidHttpClient,
73 ws_client: HyperliquidWebSocketClient,
74 is_connected: AtomicBool,
75 cancellation_token: CancellationToken,
76 tasks: Vec<JoinHandle<()>>,
77 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79 coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
82 clock: &'static AtomicTime,
83 #[allow(dead_code)]
84 instrument_refresh_active: bool,
85}
86
87impl HyperliquidDataClient {
88 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
94 let clock = get_atomic_clock_realtime();
95 let data_sender = get_data_event_sender();
96
97 let http_client = if let Some(private_key_str) = &config.private_key {
98 let secrets = crate::common::credential::Secrets {
99 private_key: crate::common::credential::EvmPrivateKey::new(
100 private_key_str.clone(),
101 )?,
102 is_testnet: config.is_testnet,
103 vault_address: None,
104 };
105 HyperliquidHttpClient::with_credentials(
106 &secrets,
107 config.http_timeout_secs,
108 config.http_proxy_url.clone(),
109 )?
110 } else {
111 HyperliquidHttpClient::new(
112 config.is_testnet,
113 config.http_timeout_secs,
114 config.http_proxy_url.clone(),
115 )?
116 };
117
118 let ws_client = HyperliquidWebSocketClient::new(
121 None,
122 config.is_testnet,
123 HyperliquidProductType::Perp,
124 None,
125 );
126
127 Ok(Self {
128 client_id,
129 config,
130 http_client,
131 ws_client,
132 is_connected: AtomicBool::new(false),
133 cancellation_token: CancellationToken::new(),
134 tasks: Vec::new(),
135 data_sender,
136 instruments: Arc::new(RwLock::new(AHashMap::new())),
137 coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
138 clock,
139 instrument_refresh_active: false,
140 })
141 }
142
143 fn venue(&self) -> Venue {
144 *HYPERLIQUID_VENUE
145 }
146
147 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
148 let instruments = self
149 .http_client
150 .request_instruments()
151 .await
152 .context("failed to fetch instruments during bootstrap")?;
153
154 let mut instruments_map = self.instruments.write().unwrap();
155 let mut coin_map = self.coin_to_instrument_id.write().unwrap();
156
157 for instrument in &instruments {
158 let instrument_id = instrument.id();
159 instruments_map.insert(instrument_id, instrument.clone());
160
161 let coin = instrument.raw_symbol().inner();
164 if instrument_id.symbol.as_str().starts_with("BTCUSD") {
165 log::warn!(
166 "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
167 instrument_id,
168 instrument.raw_symbol(),
169 coin
170 );
171 }
172 coin_map.insert(coin, instrument_id);
173
174 self.ws_client.cache_instrument(instrument.clone());
175 }
176
177 tracing::info!(
178 "Bootstrapped {} instruments with {} coin mappings",
179 instruments_map.len(),
180 coin_map.len()
181 );
182 Ok(instruments)
183 }
184
185 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
186 let mut ws_client = self.ws_client.clone();
188
189 ws_client
190 .connect()
191 .await
192 .context("failed to connect to Hyperliquid WebSocket")?;
193
194 let _data_sender = self.data_sender.clone();
195 let _instruments = Arc::clone(&self.instruments);
196 let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
197 let _venue = self.venue();
198 let _clock = self.clock;
199 let cancellation_token = self.cancellation_token.clone();
200
201 let task = get_runtime().spawn(async move {
202 tracing::info!("Hyperliquid WebSocket consumption loop started");
203
204 loop {
205 tokio::select! {
206 _ = cancellation_token.cancelled() => {
207 tracing::info!("WebSocket consumption loop cancelled");
208 break;
209 }
210 msg_opt = ws_client.next_event() => {
211 if let Some(msg) = msg_opt {
212 match msg {
213 NautilusWsMessage::Trades(_)
215 | NautilusWsMessage::Quote(_)
216 | NautilusWsMessage::Deltas(_)
217 | NautilusWsMessage::Candle(_)
218 | NautilusWsMessage::MarkPrice(_)
219 | NautilusWsMessage::IndexPrice(_)
220 | NautilusWsMessage::FundingRate(_) => {}
221 NautilusWsMessage::Reconnected => {
222 tracing::info!("WebSocket reconnected");
223 }
224 NautilusWsMessage::Error(e) => {
225 tracing::error!("WebSocket error: {e}");
226 }
227 NautilusWsMessage::ExecutionReports(_) => {
228 }
230 }
231 } else {
232 tracing::warn!("WebSocket next_event returned None, connection may be closed");
234 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
235 }
236 }
237 }
238 }
239
240 tracing::info!("Hyperliquid WebSocket consumption loop finished");
241 });
242
243 self.tasks.push(task);
244 tracing::info!("WebSocket consumption task spawned");
245
246 Ok(())
247 }
248
249 #[allow(dead_code)]
250 fn handle_ws_message(
251 msg: HyperliquidWsMessage,
252 ws_client: &HyperliquidWebSocketClient,
253 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
254 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
255 coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
256 _venue: Venue,
257 clock: &'static AtomicTime,
258 ) {
259 match msg {
260 HyperliquidWsMessage::Bbo { data } => {
261 let coin = data.coin;
262 tracing::debug!("Received BBO message for coin: {coin}");
263
264 let coin_map = coin_to_instrument_id.read().unwrap();
267 let instrument_id = coin_map.get(&data.coin);
268
269 if let Some(&instrument_id) = instrument_id {
270 let instruments_map = instruments.read().unwrap();
271 if let Some(instrument) = instruments_map.get(&instrument_id) {
272 let ts_init = clock.get_time_ns();
273
274 match parse_ws_quote_tick(&data, instrument, ts_init) {
275 Ok(quote_tick) => {
276 tracing::debug!(
277 "Parsed quote tick for {}: bid={}, ask={}",
278 data.coin,
279 quote_tick.bid_price,
280 quote_tick.ask_price
281 );
282 if let Err(e) =
283 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
284 {
285 tracing::error!("Failed to send quote tick: {e}");
286 }
287 }
288 Err(e) => {
289 tracing::error!(
290 "Failed to parse quote tick for {}: {e}",
291 data.coin
292 );
293 }
294 }
295 }
296 } else {
297 tracing::warn!(
298 "Received BBO for unknown coin: {} (no matching instrument found)",
299 data.coin
300 );
301 }
302 }
303 HyperliquidWsMessage::Trades { data } => {
304 let count = data.len();
305 tracing::debug!("Received {count} trade(s)");
306
307 for trade_data in data {
309 let coin = trade_data.coin;
310 let coin_map = coin_to_instrument_id.read().unwrap();
311
312 if let Some(&instrument_id) = coin_map.get(&coin) {
313 let instruments_map = instruments.read().unwrap();
314 if let Some(instrument) = instruments_map.get(&instrument_id) {
315 let ts_init = clock.get_time_ns();
316
317 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
318 Ok(trade_tick) => {
319 if let Err(e) =
320 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
321 {
322 tracing::error!("Failed to send trade tick: {e}");
323 }
324 }
325 Err(e) => {
326 tracing::error!("Failed to parse trade tick for {coin}: {e}");
327 }
328 }
329 }
330 } else {
331 tracing::warn!("Received trade for unknown coin: {coin}");
332 }
333 }
334 }
335 HyperliquidWsMessage::L2Book { data } => {
336 let coin = data.coin;
337 tracing::debug!("Received L2 book update for coin: {coin}");
338
339 let coin_map = coin_to_instrument_id.read().unwrap();
340 if let Some(&instrument_id) = coin_map.get(&data.coin) {
341 let instruments_map = instruments.read().unwrap();
342 if let Some(instrument) = instruments_map.get(&instrument_id) {
343 let ts_init = clock.get_time_ns();
344
345 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
346 Ok(deltas) => {
347 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
348 OrderBookDeltas_API::new(deltas),
349 ))) {
350 tracing::error!("Failed to send order book deltas: {e}");
351 }
352 }
353 Err(e) => {
354 tracing::error!(
355 "Failed to parse order book deltas for {}: {e}",
356 data.coin
357 );
358 }
359 }
360 }
361 } else {
362 tracing::warn!("Received L2 book for unknown coin: {coin}");
363 }
364 }
365 HyperliquidWsMessage::Candle { data } => {
366 let coin = &data.s;
367 let interval = &data.i;
368 tracing::debug!("Received candle for {coin}:{interval}");
369
370 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
371 let coin = Ustr::from(&data.s);
372 let coin_map = coin_to_instrument_id.read().unwrap();
373
374 if let Some(&instrument_id) = coin_map.get(&coin) {
375 let instruments_map = instruments.read().unwrap();
376 if let Some(instrument) = instruments_map.get(&instrument_id) {
377 let ts_init = clock.get_time_ns();
378
379 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
380 Ok(bar) => {
381 if let Err(e) =
382 data_sender.send(DataEvent::Data(Data::Bar(bar)))
383 {
384 tracing::error!("Failed to send bar data: {e}");
385 }
386 }
387 Err(e) => {
388 tracing::error!("Failed to parse candle for {coin}: {e}");
389 }
390 }
391 }
392 } else {
393 tracing::warn!("Received candle for unknown coin: {coin}");
394 }
395 } else {
396 tracing::debug!("Received candle for {coin}:{interval} but no BarType tracked");
397 }
398 }
399 _ => {
400 tracing::trace!("Received unhandled WebSocket message: {:?}", msg);
402 }
403 }
404 }
405
406 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
407 let instruments = self.instruments.read().unwrap();
408 instruments
409 .get(instrument_id)
410 .cloned()
411 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
412 }
413}
414
415impl HyperliquidDataClient {
416 #[allow(dead_code)]
417 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
418 if let Err(e) = sender.send(DataEvent::Data(data)) {
419 tracing::error!("Failed to emit data event: {e}");
420 }
421 }
422}
423
424#[async_trait::async_trait(?Send)]
425impl DataClient for HyperliquidDataClient {
426 fn client_id(&self) -> ClientId {
427 self.client_id
428 }
429
430 fn venue(&self) -> Option<Venue> {
431 Some(self.venue())
432 }
433
434 fn start(&mut self) -> anyhow::Result<()> {
435 tracing::info!(
436 client_id = %self.client_id,
437 is_testnet = self.config.is_testnet,
438 http_proxy_url = ?self.config.http_proxy_url,
439 ws_proxy_url = ?self.config.ws_proxy_url,
440 "Starting Hyperliquid data client"
441 );
442 Ok(())
443 }
444
445 fn stop(&mut self) -> anyhow::Result<()> {
446 tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
447 self.cancellation_token.cancel();
448 self.is_connected.store(false, Ordering::Relaxed);
449 Ok(())
450 }
451
452 fn reset(&mut self) -> anyhow::Result<()> {
453 tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
454 self.is_connected.store(false, Ordering::Relaxed);
455 self.cancellation_token = CancellationToken::new();
456 self.tasks.clear();
457 Ok(())
458 }
459
460 fn dispose(&mut self) -> anyhow::Result<()> {
461 tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
462 self.stop()
463 }
464
465 fn is_connected(&self) -> bool {
466 self.is_connected.load(Ordering::Acquire)
467 }
468
469 fn is_disconnected(&self) -> bool {
470 !self.is_connected()
471 }
472
473 async fn connect(&mut self) -> anyhow::Result<()> {
474 if self.is_connected() {
475 return Ok(());
476 }
477
478 let _instruments = self
480 .bootstrap_instruments()
481 .await
482 .context("failed to bootstrap instruments")?;
483
484 self.spawn_ws()
486 .await
487 .context("failed to spawn WebSocket client")?;
488
489 self.is_connected.store(true, Ordering::Relaxed);
490 tracing::info!(client_id = %self.client_id, "Connected");
491
492 Ok(())
493 }
494
495 async fn disconnect(&mut self) -> anyhow::Result<()> {
496 if !self.is_connected() {
497 return Ok(());
498 }
499
500 self.cancellation_token.cancel();
502
503 for task in self.tasks.drain(..) {
505 if let Err(e) = task.await {
506 tracing::error!("Error waiting for task to complete: {e}");
507 }
508 }
509
510 if let Err(e) = self.ws_client.disconnect().await {
512 tracing::error!("Error disconnecting WebSocket client: {e}");
513 }
514
515 {
517 let mut instruments = self.instruments.write().unwrap();
518 instruments.clear();
519 }
520
521 self.is_connected.store(false, Ordering::Relaxed);
522 tracing::info!(client_id = %self.client_id, "Disconnected");
523
524 Ok(())
525 }
526
527 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
528 tracing::debug!("Requesting all instruments");
529
530 let instruments = {
531 let instruments_map = self.instruments.read().unwrap();
532 instruments_map.values().cloned().collect()
533 };
534
535 let response = DataResponse::Instruments(InstrumentsResponse::new(
536 request.request_id,
537 request.client_id.unwrap_or(self.client_id),
538 self.venue(),
539 instruments,
540 datetime_to_unix_nanos(request.start),
541 datetime_to_unix_nanos(request.end),
542 self.clock.get_time_ns(),
543 request.params.clone(),
544 ));
545
546 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
547 tracing::error!("Failed to send instruments response: {e}");
548 }
549
550 Ok(())
551 }
552
553 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
554 tracing::debug!("Requesting instrument: {}", request.instrument_id);
555
556 let instrument = self.get_instrument(&request.instrument_id)?;
557
558 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
559 request.request_id,
560 request.client_id.unwrap_or(self.client_id),
561 instrument.id(),
562 instrument,
563 datetime_to_unix_nanos(request.start),
564 datetime_to_unix_nanos(request.end),
565 self.clock.get_time_ns(),
566 request.params.clone(),
567 )));
568
569 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
570 tracing::error!("Failed to send instrument response: {e}");
571 }
572
573 Ok(())
574 }
575
576 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
577 tracing::debug!("Requesting bars for {}", request.bar_type);
578
579 let http = self.http_client.clone();
580 let sender = self.data_sender.clone();
581 let bar_type = request.bar_type;
582 let start = request.start;
583 let end = request.end;
584 let limit = request.limit.map(|n| n.get() as u32);
585 let request_id = request.request_id;
586 let client_id = request.client_id.unwrap_or(self.client_id);
587 let params = request.params.clone();
588 let clock = self.clock;
589 let start_nanos = datetime_to_unix_nanos(start);
590 let end_nanos = datetime_to_unix_nanos(end);
591 let instruments = Arc::clone(&self.instruments);
592
593 get_runtime().spawn(async move {
594 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
595 Ok(bars) => {
596 let response = DataResponse::Bars(BarsResponse::new(
597 request_id,
598 client_id,
599 bar_type,
600 bars,
601 start_nanos,
602 end_nanos,
603 clock.get_time_ns(),
604 params,
605 ));
606 if let Err(e) = sender.send(DataEvent::Response(response)) {
607 tracing::error!("Failed to send bars response: {e}");
608 }
609 }
610 Err(e) => tracing::error!("Bar request failed: {e:?}"),
611 }
612 });
613
614 Ok(())
615 }
616
617 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
618 tracing::debug!("Requesting trades for {}", request.instrument_id);
619
620 tracing::warn!(
625 "Historical trade data not available via REST on Hyperliquid for {}",
626 request.instrument_id
627 );
628
629 let trades = Vec::new();
630
631 let response = DataResponse::Trades(TradesResponse::new(
632 request.request_id,
633 request.client_id.unwrap_or(self.client_id),
634 request.instrument_id,
635 trades,
636 datetime_to_unix_nanos(request.start),
637 datetime_to_unix_nanos(request.end),
638 self.clock.get_time_ns(),
639 request.params.clone(),
640 ));
641
642 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
643 tracing::error!("Failed to send trades response: {e}");
644 }
645
646 Ok(())
647 }
648
649 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
650 tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
651
652 let ws = self.ws_client.clone();
653 let instrument_id = subscription.instrument_id;
654
655 get_runtime().spawn(async move {
656 if let Err(e) = ws.subscribe_trades(instrument_id).await {
657 tracing::error!("Failed to subscribe to trades: {e:?}");
658 }
659 });
660
661 Ok(())
662 }
663
664 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
665 tracing::debug!(
666 "Unsubscribing from trades: {}",
667 unsubscription.instrument_id
668 );
669
670 let ws = self.ws_client.clone();
671 let instrument_id = unsubscription.instrument_id;
672
673 get_runtime().spawn(async move {
674 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
675 tracing::error!("Failed to unsubscribe from trades: {e:?}");
676 }
677 });
678
679 tracing::info!(
680 "Unsubscribed from trades for {}",
681 unsubscription.instrument_id
682 );
683
684 Ok(())
685 }
686
687 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
688 tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
689
690 if subscription.book_type != BookType::L2_MBP {
691 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
692 }
693
694 let ws = self.ws_client.clone();
695 let instrument_id = subscription.instrument_id;
696
697 get_runtime().spawn(async move {
698 if let Err(e) = ws.subscribe_book(instrument_id).await {
699 tracing::error!("Failed to subscribe to book deltas: {e:?}");
700 }
701 });
702
703 Ok(())
704 }
705
706 fn unsubscribe_book_deltas(
707 &mut self,
708 unsubscription: &UnsubscribeBookDeltas,
709 ) -> anyhow::Result<()> {
710 tracing::debug!(
711 "Unsubscribing from book deltas: {}",
712 unsubscription.instrument_id
713 );
714
715 let ws = self.ws_client.clone();
716 let instrument_id = unsubscription.instrument_id;
717
718 get_runtime().spawn(async move {
719 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
720 tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
721 }
722 });
723
724 Ok(())
725 }
726
727 fn subscribe_book_snapshots(
728 &mut self,
729 subscription: &SubscribeBookSnapshots,
730 ) -> anyhow::Result<()> {
731 tracing::debug!(
732 "Subscribing to book snapshots: {}",
733 subscription.instrument_id
734 );
735
736 if subscription.book_type != BookType::L2_MBP {
737 anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
738 }
739
740 let ws = self.ws_client.clone();
741 let instrument_id = subscription.instrument_id;
742
743 get_runtime().spawn(async move {
744 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
745 tracing::error!("Failed to subscribe to book snapshots: {e:?}");
746 }
747 });
748
749 Ok(())
750 }
751
752 fn unsubscribe_book_snapshots(
753 &mut self,
754 unsubscription: &UnsubscribeBookSnapshots,
755 ) -> anyhow::Result<()> {
756 tracing::debug!(
757 "Unsubscribing from book snapshots: {}",
758 unsubscription.instrument_id
759 );
760
761 let ws = self.ws_client.clone();
762 let instrument_id = unsubscription.instrument_id;
763
764 get_runtime().spawn(async move {
765 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
766 tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
767 }
768 });
769
770 Ok(())
771 }
772
773 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
774 tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
775
776 let ws = self.ws_client.clone();
777 let instrument_id = subscription.instrument_id;
778
779 get_runtime().spawn(async move {
780 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
781 tracing::error!("Failed to subscribe to quotes: {e:?}");
782 }
783 });
784
785 Ok(())
786 }
787
788 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
789 tracing::debug!(
790 "Unsubscribing from quotes: {}",
791 unsubscription.instrument_id
792 );
793
794 let ws = self.ws_client.clone();
795 let instrument_id = unsubscription.instrument_id;
796
797 get_runtime().spawn(async move {
798 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
799 tracing::error!("Failed to unsubscribe from quotes: {e:?}");
800 }
801 });
802
803 tracing::info!(
804 "Unsubscribed from quotes for {}",
805 unsubscription.instrument_id
806 );
807
808 Ok(())
809 }
810
811 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
812 tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
813
814 let instruments = self.instruments.read().unwrap();
815 let instrument_id = subscription.bar_type.instrument_id();
816 if !instruments.contains_key(&instrument_id) {
817 anyhow::bail!("Instrument {instrument_id} not found");
818 }
819
820 drop(instruments);
821
822 let bar_type = subscription.bar_type;
823 let ws = self.ws_client.clone();
824
825 get_runtime().spawn(async move {
826 if let Err(e) = ws.subscribe_bars(bar_type).await {
827 tracing::error!("Failed to subscribe to bars: {e:?}");
828 }
829 });
830
831 tracing::info!("Subscribed to bars for {}", subscription.bar_type);
832
833 Ok(())
834 }
835
836 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
837 tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
838
839 let bar_type = unsubscription.bar_type;
840 let ws = self.ws_client.clone();
841
842 get_runtime().spawn(async move {
843 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
844 tracing::error!("Failed to unsubscribe from bars: {e:?}");
845 }
846 });
847
848 tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
849
850 Ok(())
851 }
852}
853
854pub(crate) fn candle_to_bar(
855 candle: &HyperliquidCandle,
856 bar_type: BarType,
857 price_precision: u8,
858 size_precision: u8,
859) -> anyhow::Result<Bar> {
860 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
861 let ts_event = ts_init;
862
863 let open = candle.open.parse::<f64>().context("parse open price")?;
864 let high = candle.high.parse::<f64>().context("parse high price")?;
865 let low = candle.low.parse::<f64>().context("parse low price")?;
866 let close = candle.close.parse::<f64>().context("parse close price")?;
867 let volume = candle.volume.parse::<f64>().context("parse volume")?;
868
869 Ok(Bar::new(
870 bar_type,
871 Price::new(open, price_precision),
872 Price::new(high, price_precision),
873 Price::new(low, price_precision),
874 Price::new(close, price_precision),
875 Quantity::new(volume, size_precision),
876 ts_event,
877 ts_init,
878 ))
879}
880
881async fn request_bars_from_http(
883 http_client: HyperliquidHttpClient,
884 bar_type: BarType,
885 start: Option<DateTime<Utc>>,
886 end: Option<DateTime<Utc>>,
887 limit: Option<u32>,
888 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
889) -> anyhow::Result<Vec<Bar>> {
890 let instrument_id = bar_type.instrument_id();
892 let instrument = {
893 let guard = instruments.read().unwrap();
894 guard
895 .get(&instrument_id)
896 .cloned()
897 .context("instrument not found in cache")?
898 };
899
900 let price_precision = instrument.price_precision();
901 let size_precision = instrument.size_precision();
902
903 let coin = instrument_id
905 .symbol
906 .as_str()
907 .split('-')
908 .next()
909 .context("invalid instrument symbol")?;
910
911 let interval = bar_type_to_interval(&bar_type)?;
912
913 let now = Utc::now();
915 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
916 let start_time = if let Some(start) = start {
917 start.timestamp_millis() as u64
918 } else {
919 let spec = bar_type.spec();
921 let step_ms = match spec.aggregation {
922 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
923 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
924 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
925 _ => 60_000,
926 };
927 end_time.saturating_sub(1000 * step_ms)
928 };
929
930 let candles = http_client
931 .info_candle_snapshot(coin, interval, start_time, end_time)
932 .await
933 .context("failed to fetch candle snapshot from Hyperliquid")?;
934
935 let mut bars: Vec<Bar> = candles
936 .iter()
937 .filter_map(|candle| {
938 candle_to_bar(candle, bar_type, price_precision, size_precision)
939 .map_err(|e| {
940 tracing::warn!("Failed to convert candle to bar: {e}");
941 e
942 })
943 .ok()
944 })
945 .collect();
946
947 if let Some(limit) = limit
948 && bars.len() > limit as usize
949 {
950 bars = bars.into_iter().take(limit as usize).collect();
951 }
952
953 tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
954 Ok(bars)
955}