1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 messages::{
26 DataEvent,
27 data::{
28 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
29 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
30 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeQuotes, SubscribeTrades,
31 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
32 UnsubscribeQuotes, UnsubscribeTrades,
33 },
34 },
35 runner::get_data_event_sender,
36};
37use nautilus_core::{
38 UnixNanos,
39 time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43 data::{Bar, BarType, Data, OrderBookDeltas_API},
44 enums::BarAggregation,
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 types::{Price, Quantity},
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::{HyperliquidProductType, consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
55 config::HyperliquidDataClientConfig,
56 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
57 websocket::{
58 client::HyperliquidWebSocketClient,
59 messages::{HyperliquidWsMessage, NautilusWsMessage},
60 parse::{
61 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
62 },
63 },
64};
65
66#[derive(Debug)]
67pub struct HyperliquidDataClient {
68 client_id: ClientId,
69 #[allow(dead_code)]
70 config: HyperliquidDataClientConfig,
71 http_client: HyperliquidHttpClient,
72 ws_client: HyperliquidWebSocketClient,
73 is_connected: AtomicBool,
74 cancellation_token: CancellationToken,
75 tasks: Vec<JoinHandle<()>>,
76 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78 coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
81 clock: &'static AtomicTime,
82 #[allow(dead_code)]
83 instrument_refresh_active: bool,
84}
85
86impl HyperliquidDataClient {
87 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
93 let clock = get_atomic_clock_realtime();
94 let data_sender = get_data_event_sender();
95
96 let http_client = if let Some(private_key_str) = &config.private_key {
97 let secrets = crate::common::credential::Secrets {
98 private_key: crate::common::credential::EvmPrivateKey::new(
99 private_key_str.clone(),
100 )?,
101 is_testnet: config.is_testnet,
102 vault_address: None,
103 };
104 HyperliquidHttpClient::with_credentials(
105 &secrets,
106 config.http_timeout_secs,
107 config.http_proxy_url.clone(),
108 )?
109 } else {
110 HyperliquidHttpClient::new(
111 config.is_testnet,
112 config.http_timeout_secs,
113 config.http_proxy_url.clone(),
114 )?
115 };
116
117 let ws_client = HyperliquidWebSocketClient::new(
120 None,
121 config.is_testnet,
122 HyperliquidProductType::Perp,
123 None,
124 );
125
126 Ok(Self {
127 client_id,
128 config,
129 http_client,
130 ws_client,
131 is_connected: AtomicBool::new(false),
132 cancellation_token: CancellationToken::new(),
133 tasks: Vec::new(),
134 data_sender,
135 instruments: Arc::new(RwLock::new(AHashMap::new())),
136 coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
137 clock,
138 instrument_refresh_active: false,
139 })
140 }
141
142 fn venue(&self) -> Venue {
143 *HYPERLIQUID_VENUE
144 }
145
146 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
147 let instruments = self
148 .http_client
149 .request_instruments()
150 .await
151 .context("failed to fetch instruments during bootstrap")?;
152
153 let mut instruments_map = self.instruments.write().unwrap();
154 let mut coin_map = self.coin_to_instrument_id.write().unwrap();
155
156 for instrument in &instruments {
157 let instrument_id = instrument.id();
158 instruments_map.insert(instrument_id, instrument.clone());
159
160 let coin = instrument.raw_symbol().inner();
163 if instrument_id.symbol.as_str().starts_with("BTCUSD") {
164 log::warn!(
165 "DEBUG bootstrap BTCUSD: instrument_id={}, raw_symbol={}, coin={}",
166 instrument_id,
167 instrument.raw_symbol(),
168 coin
169 );
170 }
171 coin_map.insert(coin, instrument_id);
172
173 self.ws_client.cache_instrument(instrument.clone());
174 }
175
176 tracing::info!(
177 "Bootstrapped {} instruments with {} coin mappings",
178 instruments_map.len(),
179 coin_map.len()
180 );
181 Ok(instruments)
182 }
183
184 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
185 let mut ws_client = self.ws_client.clone();
187
188 ws_client
189 .connect()
190 .await
191 .context("failed to connect to Hyperliquid WebSocket")?;
192
193 let _data_sender = self.data_sender.clone();
194 let _instruments = Arc::clone(&self.instruments);
195 let _coin_to_instrument_id = Arc::clone(&self.coin_to_instrument_id);
196 let _venue = self.venue();
197 let _clock = self.clock;
198 let cancellation_token = self.cancellation_token.clone();
199
200 let task = tokio::spawn(async move {
201 tracing::info!("Hyperliquid WebSocket consumption loop started");
202
203 loop {
204 tokio::select! {
205 _ = cancellation_token.cancelled() => {
206 tracing::info!("WebSocket consumption loop cancelled");
207 break;
208 }
209 msg_opt = ws_client.next_event() => {
210 if let Some(msg) = msg_opt {
211 match msg {
212 NautilusWsMessage::Trades(_)
214 | NautilusWsMessage::Quote(_)
215 | NautilusWsMessage::Deltas(_)
216 | NautilusWsMessage::Candle(_)
217 | NautilusWsMessage::MarkPrice(_)
218 | NautilusWsMessage::IndexPrice(_)
219 | NautilusWsMessage::FundingRate(_) => {}
220 NautilusWsMessage::Reconnected => {
221 tracing::info!("WebSocket reconnected");
222 }
223 NautilusWsMessage::Error(e) => {
224 tracing::error!("WebSocket error: {e}");
225 }
226 NautilusWsMessage::ExecutionReports(_) => {
227 }
229 }
230 } else {
231 tracing::warn!("WebSocket next_event returned None, connection may be closed");
233 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
234 }
235 }
236 }
237 }
238
239 tracing::info!("Hyperliquid WebSocket consumption loop finished");
240 });
241
242 self.tasks.push(task);
243 tracing::info!("WebSocket consumption task spawned");
244
245 Ok(())
246 }
247
248 #[allow(dead_code)]
249 fn handle_ws_message(
250 msg: HyperliquidWsMessage,
251 ws_client: &HyperliquidWebSocketClient,
252 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
253 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
254 coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
255 _venue: Venue,
256 clock: &'static AtomicTime,
257 ) {
258 match msg {
259 HyperliquidWsMessage::Bbo { data } => {
260 let coin = data.coin;
261 tracing::debug!("Received BBO message for coin: {coin}");
262
263 let coin_map = coin_to_instrument_id.read().unwrap();
266 let instrument_id = coin_map.get(&data.coin);
267
268 if let Some(&instrument_id) = instrument_id {
269 let instruments_map = instruments.read().unwrap();
270 if let Some(instrument) = instruments_map.get(&instrument_id) {
271 let ts_init = clock.get_time_ns();
272
273 match parse_ws_quote_tick(&data, instrument, ts_init) {
274 Ok(quote_tick) => {
275 tracing::debug!(
276 "Parsed quote tick for {}: bid={}, ask={}",
277 data.coin,
278 quote_tick.bid_price,
279 quote_tick.ask_price
280 );
281 if let Err(e) =
282 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
283 {
284 tracing::error!("Failed to send quote tick: {e}");
285 }
286 }
287 Err(e) => {
288 tracing::error!(
289 "Failed to parse quote tick for {}: {e}",
290 data.coin
291 );
292 }
293 }
294 }
295 } else {
296 tracing::warn!(
297 "Received BBO for unknown coin: {} (no matching instrument found)",
298 data.coin
299 );
300 }
301 }
302 HyperliquidWsMessage::Trades { data } => {
303 let count = data.len();
304 tracing::debug!("Received {count} trade(s)");
305
306 for trade_data in data {
308 let coin = trade_data.coin;
309 let coin_map = coin_to_instrument_id.read().unwrap();
310
311 if let Some(&instrument_id) = coin_map.get(&coin) {
312 let instruments_map = instruments.read().unwrap();
313 if let Some(instrument) = instruments_map.get(&instrument_id) {
314 let ts_init = clock.get_time_ns();
315
316 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
317 Ok(trade_tick) => {
318 if let Err(e) =
319 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
320 {
321 tracing::error!("Failed to send trade tick: {e}");
322 }
323 }
324 Err(e) => {
325 tracing::error!("Failed to parse trade tick for {coin}: {e}");
326 }
327 }
328 }
329 } else {
330 tracing::warn!("Received trade for unknown coin: {coin}");
331 }
332 }
333 }
334 HyperliquidWsMessage::L2Book { data } => {
335 let coin = data.coin;
336 tracing::debug!("Received L2 book update for coin: {coin}");
337
338 let coin_map = coin_to_instrument_id.read().unwrap();
339 if let Some(&instrument_id) = coin_map.get(&data.coin) {
340 let instruments_map = instruments.read().unwrap();
341 if let Some(instrument) = instruments_map.get(&instrument_id) {
342 let ts_init = clock.get_time_ns();
343
344 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
345 Ok(deltas) => {
346 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
347 OrderBookDeltas_API::new(deltas),
348 ))) {
349 tracing::error!("Failed to send order book deltas: {e}");
350 }
351 }
352 Err(e) => {
353 tracing::error!(
354 "Failed to parse order book deltas for {}: {e}",
355 data.coin
356 );
357 }
358 }
359 }
360 } else {
361 tracing::warn!("Received L2 book for unknown coin: {coin}");
362 }
363 }
364 HyperliquidWsMessage::Candle { data } => {
365 let coin = &data.s;
366 let interval = &data.i;
367 tracing::debug!("Received candle for {coin}:{interval}");
368
369 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
370 let coin = Ustr::from(&data.s);
371 let coin_map = coin_to_instrument_id.read().unwrap();
372
373 if let Some(&instrument_id) = coin_map.get(&coin) {
374 let instruments_map = instruments.read().unwrap();
375 if let Some(instrument) = instruments_map.get(&instrument_id) {
376 let ts_init = clock.get_time_ns();
377
378 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
379 Ok(bar) => {
380 if let Err(e) =
381 data_sender.send(DataEvent::Data(Data::Bar(bar)))
382 {
383 tracing::error!("Failed to send bar data: {e}");
384 }
385 }
386 Err(e) => {
387 tracing::error!("Failed to parse candle for {coin}: {e}");
388 }
389 }
390 }
391 } else {
392 tracing::warn!("Received candle for unknown coin: {coin}");
393 }
394 } else {
395 tracing::debug!("Received candle for {coin}:{interval} but no BarType tracked");
396 }
397 }
398 _ => {
399 tracing::trace!("Received unhandled WebSocket message: {:?}", msg);
401 }
402 }
403 }
404
405 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
406 let instruments = self.instruments.read().unwrap();
407 instruments
408 .get(instrument_id)
409 .cloned()
410 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
411 }
412}
413
414fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
415 value
416 .and_then(|dt| dt.timestamp_nanos_opt())
417 .and_then(|nanos| u64::try_from(nanos).ok())
418 .map(UnixNanos::from)
419}
420
421impl HyperliquidDataClient {
422 #[allow(dead_code)]
423 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
424 if let Err(e) = sender.send(DataEvent::Data(data)) {
425 tracing::error!("Failed to emit data event: {e}");
426 }
427 }
428}
429
430#[async_trait::async_trait]
431impl DataClient for HyperliquidDataClient {
432 fn client_id(&self) -> ClientId {
433 self.client_id
434 }
435
436 fn venue(&self) -> Option<Venue> {
437 Some(self.venue())
438 }
439
440 fn start(&mut self) -> anyhow::Result<()> {
441 tracing::info!(
442 client_id = %self.client_id,
443 is_testnet = self.config.is_testnet,
444 http_proxy_url = ?self.config.http_proxy_url,
445 ws_proxy_url = ?self.config.ws_proxy_url,
446 "Starting Hyperliquid data client"
447 );
448 Ok(())
449 }
450
451 fn stop(&mut self) -> anyhow::Result<()> {
452 tracing::info!("Stopping Hyperliquid data client {}", self.client_id);
453 self.cancellation_token.cancel();
454 self.is_connected.store(false, Ordering::Relaxed);
455 Ok(())
456 }
457
458 fn reset(&mut self) -> anyhow::Result<()> {
459 tracing::debug!("Resetting Hyperliquid data client {}", self.client_id);
460 self.is_connected.store(false, Ordering::Relaxed);
461 self.cancellation_token = CancellationToken::new();
462 self.tasks.clear();
463 Ok(())
464 }
465
466 fn dispose(&mut self) -> anyhow::Result<()> {
467 tracing::debug!("Disposing Hyperliquid data client {}", self.client_id);
468 self.stop()
469 }
470
471 fn is_connected(&self) -> bool {
472 self.is_connected.load(Ordering::Acquire)
473 }
474
475 fn is_disconnected(&self) -> bool {
476 !self.is_connected()
477 }
478
479 async fn connect(&mut self) -> anyhow::Result<()> {
480 if self.is_connected() {
481 return Ok(());
482 }
483
484 let _instruments = self
486 .bootstrap_instruments()
487 .await
488 .context("failed to bootstrap instruments")?;
489
490 self.spawn_ws()
492 .await
493 .context("failed to spawn WebSocket client")?;
494
495 self.is_connected.store(true, Ordering::Relaxed);
496 tracing::info!(client_id = %self.client_id, "Connected");
497
498 Ok(())
499 }
500
501 async fn disconnect(&mut self) -> anyhow::Result<()> {
502 if !self.is_connected() {
503 return Ok(());
504 }
505
506 self.cancellation_token.cancel();
508
509 for task in self.tasks.drain(..) {
511 if let Err(e) = task.await {
512 tracing::error!("Error waiting for task to complete: {e}");
513 }
514 }
515
516 if let Err(e) = self.ws_client.disconnect().await {
518 tracing::error!("Error disconnecting WebSocket client: {e}");
519 }
520
521 {
523 let mut instruments = self.instruments.write().unwrap();
524 instruments.clear();
525 }
526
527 self.is_connected.store(false, Ordering::Relaxed);
528 tracing::info!(client_id = %self.client_id, "Disconnected");
529
530 Ok(())
531 }
532
533 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
534 tracing::debug!("Requesting all instruments");
535
536 let instruments = {
537 let instruments_map = self.instruments.read().unwrap();
538 instruments_map.values().cloned().collect()
539 };
540
541 let response = DataResponse::Instruments(InstrumentsResponse::new(
542 request.request_id,
543 request.client_id.unwrap_or(self.client_id),
544 self.venue(),
545 instruments,
546 datetime_to_unix_nanos(request.start),
547 datetime_to_unix_nanos(request.end),
548 self.clock.get_time_ns(),
549 request.params.clone(),
550 ));
551
552 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
553 tracing::error!("Failed to send instruments response: {e}");
554 }
555
556 Ok(())
557 }
558
559 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
560 tracing::debug!("Requesting instrument: {}", request.instrument_id);
561
562 let instrument = self.get_instrument(&request.instrument_id)?;
563
564 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
565 request.request_id,
566 request.client_id.unwrap_or(self.client_id),
567 instrument.id(),
568 instrument,
569 datetime_to_unix_nanos(request.start),
570 datetime_to_unix_nanos(request.end),
571 self.clock.get_time_ns(),
572 request.params.clone(),
573 )));
574
575 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
576 tracing::error!("Failed to send instrument response: {e}");
577 }
578
579 Ok(())
580 }
581
582 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
583 tracing::debug!("Requesting bars for {}", request.bar_type);
584
585 let http = self.http_client.clone();
586 let sender = self.data_sender.clone();
587 let bar_type = request.bar_type;
588 let start = request.start;
589 let end = request.end;
590 let limit = request.limit.map(|n| n.get() as u32);
591 let request_id = request.request_id;
592 let client_id = request.client_id.unwrap_or(self.client_id);
593 let params = request.params.clone();
594 let clock = self.clock;
595 let start_nanos = datetime_to_unix_nanos(start);
596 let end_nanos = datetime_to_unix_nanos(end);
597 let instruments = Arc::clone(&self.instruments);
598
599 tokio::spawn(async move {
600 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
601 Ok(bars) => {
602 let response = DataResponse::Bars(BarsResponse::new(
603 request_id,
604 client_id,
605 bar_type,
606 bars,
607 start_nanos,
608 end_nanos,
609 clock.get_time_ns(),
610 params,
611 ));
612 if let Err(e) = sender.send(DataEvent::Response(response)) {
613 tracing::error!("Failed to send bars response: {e}");
614 }
615 }
616 Err(e) => tracing::error!("Bar request failed: {e:?}"),
617 }
618 });
619
620 Ok(())
621 }
622
623 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
624 tracing::debug!("Requesting trades for {}", request.instrument_id);
625
626 tracing::warn!(
631 "Historical trade data not available via REST on Hyperliquid for {}",
632 request.instrument_id
633 );
634
635 let trades = Vec::new();
636
637 let response = DataResponse::Trades(TradesResponse::new(
638 request.request_id,
639 request.client_id.unwrap_or(self.client_id),
640 request.instrument_id,
641 trades,
642 datetime_to_unix_nanos(request.start),
643 datetime_to_unix_nanos(request.end),
644 self.clock.get_time_ns(),
645 request.params.clone(),
646 ));
647
648 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
649 tracing::error!("Failed to send trades response: {e}");
650 }
651
652 Ok(())
653 }
654
655 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
656 tracing::debug!("Subscribing to trades: {}", subscription.instrument_id);
657
658 let ws = self.ws_client.clone();
659 let instrument_id = subscription.instrument_id;
660
661 tokio::spawn(async move {
662 if let Err(e) = ws.subscribe_trades(instrument_id).await {
663 tracing::error!("Failed to subscribe to trades: {e:?}");
664 }
665 });
666
667 Ok(())
668 }
669
670 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
671 tracing::debug!(
672 "Unsubscribing from trades: {}",
673 unsubscription.instrument_id
674 );
675
676 let ws = self.ws_client.clone();
677 let instrument_id = unsubscription.instrument_id;
678
679 tokio::spawn(async move {
680 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
681 tracing::error!("Failed to unsubscribe from trades: {e:?}");
682 }
683 });
684
685 tracing::info!(
686 "Unsubscribed from trades for {}",
687 unsubscription.instrument_id
688 );
689
690 Ok(())
691 }
692
693 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
694 tracing::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
695
696 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
697 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
698 }
699
700 let ws = self.ws_client.clone();
701 let instrument_id = subscription.instrument_id;
702
703 tokio::spawn(async move {
704 if let Err(e) = ws.subscribe_book(instrument_id).await {
705 tracing::error!("Failed to subscribe to book deltas: {e:?}");
706 }
707 });
708
709 Ok(())
710 }
711
712 fn unsubscribe_book_deltas(
713 &mut self,
714 unsubscription: &UnsubscribeBookDeltas,
715 ) -> anyhow::Result<()> {
716 tracing::debug!(
717 "Unsubscribing from book deltas: {}",
718 unsubscription.instrument_id
719 );
720
721 let ws = self.ws_client.clone();
722 let instrument_id = unsubscription.instrument_id;
723
724 tokio::spawn(async move {
725 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
726 tracing::error!("Failed to unsubscribe from book deltas: {e:?}");
727 }
728 });
729
730 Ok(())
731 }
732
733 fn subscribe_book_snapshots(
734 &mut self,
735 subscription: &SubscribeBookSnapshots,
736 ) -> anyhow::Result<()> {
737 tracing::debug!(
738 "Subscribing to book snapshots: {}",
739 subscription.instrument_id
740 );
741
742 if subscription.book_type != nautilus_model::enums::BookType::L2_MBP {
743 anyhow::bail!("Hyperliquid only supports L2_MBP order book snapshots");
744 }
745
746 let ws = self.ws_client.clone();
747 let instrument_id = subscription.instrument_id;
748
749 tokio::spawn(async move {
750 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
751 tracing::error!("Failed to subscribe to book snapshots: {e:?}");
752 }
753 });
754
755 Ok(())
756 }
757
758 fn unsubscribe_book_snapshots(
759 &mut self,
760 unsubscription: &UnsubscribeBookSnapshots,
761 ) -> anyhow::Result<()> {
762 tracing::debug!(
763 "Unsubscribing from book snapshots: {}",
764 unsubscription.instrument_id
765 );
766
767 let ws = self.ws_client.clone();
768 let instrument_id = unsubscription.instrument_id;
769
770 tokio::spawn(async move {
771 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
772 tracing::error!("Failed to unsubscribe from book snapshots: {e:?}");
773 }
774 });
775
776 Ok(())
777 }
778
779 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
780 tracing::debug!("Subscribing to quotes: {}", subscription.instrument_id);
781
782 let ws = self.ws_client.clone();
783 let instrument_id = subscription.instrument_id;
784
785 tokio::spawn(async move {
786 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
787 tracing::error!("Failed to subscribe to quotes: {e:?}");
788 }
789 });
790
791 Ok(())
792 }
793
794 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
795 tracing::debug!(
796 "Unsubscribing from quotes: {}",
797 unsubscription.instrument_id
798 );
799
800 let ws = self.ws_client.clone();
801 let instrument_id = unsubscription.instrument_id;
802
803 tokio::spawn(async move {
804 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
805 tracing::error!("Failed to unsubscribe from quotes: {e:?}");
806 }
807 });
808
809 tracing::info!(
810 "Unsubscribed from quotes for {}",
811 unsubscription.instrument_id
812 );
813
814 Ok(())
815 }
816
817 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
818 tracing::debug!("Subscribing to bars: {}", subscription.bar_type);
819
820 let instruments = self.instruments.read().unwrap();
821 let instrument_id = subscription.bar_type.instrument_id();
822 if !instruments.contains_key(&instrument_id) {
823 anyhow::bail!("Instrument {} not found", instrument_id);
824 }
825
826 drop(instruments);
827
828 let bar_type = subscription.bar_type;
829 let ws = self.ws_client.clone();
830
831 tokio::spawn(async move {
832 if let Err(e) = ws.subscribe_bars(bar_type).await {
833 tracing::error!("Failed to subscribe to bars: {e:?}");
834 }
835 });
836
837 tracing::info!("Subscribed to bars for {}", subscription.bar_type);
838
839 Ok(())
840 }
841
842 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
843 tracing::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
844
845 let bar_type = unsubscription.bar_type;
846 let ws = self.ws_client.clone();
847
848 tokio::spawn(async move {
849 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
850 tracing::error!("Failed to unsubscribe from bars: {e:?}");
851 }
852 });
853
854 tracing::info!("Unsubscribed from bars for {}", unsubscription.bar_type);
855
856 Ok(())
857 }
858}
859
860pub(crate) fn candle_to_bar(
861 candle: &HyperliquidCandle,
862 bar_type: BarType,
863 price_precision: u8,
864 size_precision: u8,
865) -> anyhow::Result<Bar> {
866 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
867 let ts_event = ts_init;
868
869 let open = candle.open.parse::<f64>().context("parse open price")?;
870 let high = candle.high.parse::<f64>().context("parse high price")?;
871 let low = candle.low.parse::<f64>().context("parse low price")?;
872 let close = candle.close.parse::<f64>().context("parse close price")?;
873 let volume = candle.volume.parse::<f64>().context("parse volume")?;
874
875 Ok(Bar::new(
876 bar_type,
877 Price::new(open, price_precision),
878 Price::new(high, price_precision),
879 Price::new(low, price_precision),
880 Price::new(close, price_precision),
881 Quantity::new(volume, size_precision),
882 ts_event,
883 ts_init,
884 ))
885}
886
887async fn request_bars_from_http(
889 http_client: HyperliquidHttpClient,
890 bar_type: BarType,
891 start: Option<DateTime<Utc>>,
892 end: Option<DateTime<Utc>>,
893 limit: Option<u32>,
894 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
895) -> anyhow::Result<Vec<Bar>> {
896 let instrument_id = bar_type.instrument_id();
898 let instrument = {
899 let guard = instruments.read().unwrap();
900 guard
901 .get(&instrument_id)
902 .cloned()
903 .context("instrument not found in cache")?
904 };
905
906 let price_precision = instrument.price_precision();
907 let size_precision = instrument.size_precision();
908
909 let coin = instrument_id
911 .symbol
912 .as_str()
913 .split('-')
914 .next()
915 .context("invalid instrument symbol")?;
916
917 let interval = bar_type_to_interval(&bar_type)?;
918
919 let now = Utc::now();
921 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
922 let start_time = if let Some(start) = start {
923 start.timestamp_millis() as u64
924 } else {
925 let spec = bar_type.spec();
927 let step_ms = match spec.aggregation {
928 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
929 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
930 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
931 _ => 60_000,
932 };
933 end_time.saturating_sub(1000 * step_ms)
934 };
935
936 let candles = http_client
937 .info_candle_snapshot(coin, interval, start_time, end_time)
938 .await
939 .context("failed to fetch candle snapshot from Hyperliquid")?;
940
941 let mut bars: Vec<Bar> = candles
942 .iter()
943 .filter_map(|candle| {
944 candle_to_bar(candle, bar_type, price_precision, size_precision)
945 .map_err(|e| {
946 tracing::warn!("Failed to convert candle to bar: {e}");
947 e
948 })
949 .ok()
950 })
951 .collect();
952
953 if let Some(limit) = limit
954 && bars.len() > limit as usize
955 {
956 bars = bars.into_iter().take(limit as usize).collect();
957 }
958
959 tracing::debug!("Fetched {} bars for {}", bars.len(), bar_type);
960 Ok(bars)
961}