1use std::{
19 collections::HashSet,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::BarType,
30 identifiers::AccountId,
31 instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34 RECONNECTED,
35 retry::{RetryManager, create_websocket_retry_manager},
36 websocket::{SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42 client::AssetContextDataType,
43 error::HyperliquidWsError,
44 messages::{
45 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47 },
48 parse::{
49 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50 parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
51 },
52};
53
54#[derive(Debug)]
56#[allow(
57 clippy::large_enum_variant,
58 reason = "Commands are ephemeral and immediately consumed"
59)]
60#[allow(private_interfaces)]
61pub enum HandlerCommand {
62 SetClient(WebSocketClient),
64 Disconnect,
66 Subscribe {
68 subscriptions: Vec<SubscriptionRequest>,
69 },
70 Unsubscribe {
72 subscriptions: Vec<SubscriptionRequest>,
73 },
74 InitializeInstruments(Vec<InstrumentAny>),
76 UpdateInstrument(InstrumentAny),
78 AddBarType { key: String, bar_type: BarType },
80 RemoveBarType { key: String },
82 UpdateAssetContextSubs {
84 coin: Ustr,
85 data_types: HashSet<AssetContextDataType>,
86 },
87}
88
89pub(super) struct FeedHandler {
90 clock: &'static AtomicTime,
91 signal: Arc<AtomicBool>,
92 client: Option<WebSocketClient>,
93 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
95 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96 account_id: Option<AccountId>,
97 subscriptions: SubscriptionState,
98 retry_manager: RetryManager<HyperliquidWsError>,
99 message_buffer: Vec<NautilusWsMessage>,
100 instruments_cache: AHashMap<Ustr, InstrumentAny>,
101 bar_types_cache: AHashMap<String, BarType>,
102 bar_cache: AHashMap<String, CandleData>,
103 asset_context_subs: AHashMap<Ustr, HashSet<AssetContextDataType>>,
104 mark_price_cache: AHashMap<Ustr, String>,
105 index_price_cache: AHashMap<Ustr, String>,
106 funding_rate_cache: AHashMap<Ustr, String>,
107}
108
109impl FeedHandler {
110 pub(super) fn new(
112 signal: Arc<AtomicBool>,
113 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
114 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
116 account_id: Option<AccountId>,
117 subscriptions: SubscriptionState,
118 ) -> Self {
119 Self {
120 clock: get_atomic_clock_realtime(),
121 signal,
122 client: None,
123 cmd_rx,
124 raw_rx,
125 out_tx,
126 account_id,
127 subscriptions,
128 retry_manager: create_websocket_retry_manager(),
129 message_buffer: Vec::new(),
130 instruments_cache: AHashMap::new(),
131 bar_types_cache: AHashMap::new(),
132 bar_cache: AHashMap::new(),
133 asset_context_subs: AHashMap::new(),
134 mark_price_cache: AHashMap::new(),
135 index_price_cache: AHashMap::new(),
136 funding_rate_cache: AHashMap::new(),
137 }
138 }
139
140 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
142 self.out_tx
143 .send(msg)
144 .map_err(|e| format!("Failed to send message: {e}"))
145 }
146
147 pub(super) fn is_stopped(&self) -> bool {
149 self.signal.load(Ordering::Relaxed)
150 }
151
152 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
154 if let Some(client) = &self.client {
155 self.retry_manager
156 .execute_with_retry(
157 "websocket_send",
158 || {
159 let payload = payload.clone();
160 async move {
161 client.send_text(payload, None).await.map_err(|e| {
162 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
163 })
164 }
165 },
166 should_retry_hyperliquid_error,
167 create_hyperliquid_timeout_error,
168 )
169 .await
170 .map_err(|e| anyhow::anyhow!("{e}"))
171 } else {
172 Err(anyhow::anyhow!("No WebSocket client available"))
173 }
174 }
175
176 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
177 if !self.message_buffer.is_empty() {
178 return Some(self.message_buffer.remove(0));
179 }
180
181 loop {
182 tokio::select! {
183 Some(cmd) = self.cmd_rx.recv() => {
184 match cmd {
185 HandlerCommand::SetClient(client) => {
186 tracing::debug!("Setting WebSocket client in handler");
187 self.client = Some(client);
188 }
189 HandlerCommand::Disconnect => {
190 tracing::debug!("Handler received disconnect command");
191 if let Some(ref client) = self.client {
192 client.disconnect().await;
193 }
194 self.signal.store(true, Ordering::SeqCst);
195 return None;
196 }
197 HandlerCommand::Subscribe { subscriptions } => {
198 for subscription in subscriptions {
199 let key = subscription_to_key(&subscription);
200 self.subscriptions.mark_subscribe(&key);
201
202 let request = HyperliquidWsRequest::Subscribe { subscription };
203 match serde_json::to_string(&request) {
204 Ok(payload) => {
205 tracing::debug!("Sending subscribe payload: {payload}");
206 if let Err(e) = self.send_with_retry(payload).await {
207 tracing::error!("Error subscribing to {key}: {e}");
208 self.subscriptions.mark_failure(&key);
209 }
210 }
211 Err(e) => {
212 tracing::error!("Error serializing subscription for {key}: {e}");
213 self.subscriptions.mark_failure(&key);
214 }
215 }
216 }
217 }
218 HandlerCommand::Unsubscribe { subscriptions } => {
219 for subscription in subscriptions {
220 let key = subscription_to_key(&subscription);
221 self.subscriptions.mark_unsubscribe(&key);
222
223 let request = HyperliquidWsRequest::Unsubscribe { subscription };
224 match serde_json::to_string(&request) {
225 Ok(payload) => {
226 tracing::debug!("Sending unsubscribe payload: {payload}");
227 if let Err(e) = self.send_with_retry(payload).await {
228 tracing::error!("Error unsubscribing from {key}: {e}");
229 }
230 }
231 Err(e) => {
232 tracing::error!("Error serializing unsubscription for {key}: {e}");
233 }
234 }
235 }
236 }
237 HandlerCommand::InitializeInstruments(instruments) => {
238 for inst in instruments {
239 let full_symbol = inst.symbol().inner();
240 let coin = inst.raw_symbol().inner();
241
242 self.instruments_cache.insert(full_symbol, inst.clone());
243 self.instruments_cache.insert(coin, inst);
244 }
245 }
246 HandlerCommand::UpdateInstrument(inst) => {
247 let full_symbol = inst.symbol().inner();
248 let coin = inst.raw_symbol().inner();
249
250 self.instruments_cache.insert(full_symbol, inst.clone());
251 self.instruments_cache.insert(coin, inst);
252 }
253 HandlerCommand::AddBarType { key, bar_type } => {
254 self.bar_types_cache.insert(key, bar_type);
255 }
256 HandlerCommand::RemoveBarType { key } => {
257 self.bar_types_cache.remove(&key);
258 self.bar_cache.remove(&key);
259 }
260 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
261 if data_types.is_empty() {
262 self.asset_context_subs.remove(&coin);
263 } else {
264 self.asset_context_subs.insert(coin, data_types);
265 }
266 }
267 }
268 continue;
269 }
270
271 Some(raw_msg) = self.raw_rx.recv() => {
272 match raw_msg {
273 Message::Text(text) => {
274 if text == RECONNECTED {
275 tracing::info!("Received RECONNECTED sentinel");
276 return Some(NautilusWsMessage::Reconnected);
277 }
278
279 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
280 Ok(msg) => {
281 let ts_init = self.clock.get_time_ns();
282
283 let nautilus_msgs = Self::parse_to_nautilus_messages(
284 msg,
285 &self.instruments_cache,
286 &self.bar_types_cache,
287 self.account_id,
288 ts_init,
289 &self.asset_context_subs,
290 &mut self.mark_price_cache,
291 &mut self.index_price_cache,
292 &mut self.funding_rate_cache,
293 &mut self.bar_cache,
294 );
295
296 if !nautilus_msgs.is_empty() {
297 let mut iter = nautilus_msgs.into_iter();
298 let first = iter.next().unwrap();
299 self.message_buffer.extend(iter);
300 return Some(first);
301 }
302 }
303 Err(e) => {
304 tracing::error!("Error parsing WebSocket message: {e}, text: {text}");
305 }
306 }
307 }
308 Message::Ping(data) => {
309 if let Some(ref client) = self.client
310 && let Err(e) = client.send_pong(data.to_vec()).await {
311 tracing::error!("Error sending pong: {e}");
312 }
313 }
314 Message::Close(_) => {
315 tracing::info!("Received WebSocket close frame");
316 return None;
317 }
318 _ => {}
319 }
320 }
321
322 else => {
323 tracing::debug!("Handler shutting down: stream ended or command channel closed");
324 return None;
325 }
326 }
327 }
328 }
329
330 #[allow(clippy::too_many_arguments)]
331 fn parse_to_nautilus_messages(
332 msg: HyperliquidWsMessage,
333 instruments: &AHashMap<Ustr, InstrumentAny>,
334 bar_types: &AHashMap<String, BarType>,
335 account_id: Option<AccountId>,
336 ts_init: UnixNanos,
337 asset_context_subs: &AHashMap<Ustr, HashSet<AssetContextDataType>>,
338 mark_price_cache: &mut AHashMap<Ustr, String>,
339 index_price_cache: &mut AHashMap<Ustr, String>,
340 funding_rate_cache: &mut AHashMap<Ustr, String>,
341 bar_cache: &mut AHashMap<String, CandleData>,
342 ) -> Vec<NautilusWsMessage> {
343 let mut result = Vec::new();
344
345 match msg {
346 HyperliquidWsMessage::OrderUpdates { data } => {
347 if let Some(account_id) = account_id
348 && let Some(msg) =
349 Self::handle_order_updates(&data, instruments, account_id, ts_init)
350 {
351 result.push(msg);
352 }
353 }
354 HyperliquidWsMessage::UserEvents { data } => {
355 if let Some(account_id) = account_id
356 && let WsUserEventData::Fills { fills } = data
357 && let Some(msg) =
358 Self::handle_user_fills(&fills, instruments, account_id, ts_init)
359 {
360 result.push(msg);
361 }
362 }
363 HyperliquidWsMessage::Trades { data } => {
364 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
365 result.push(msg);
366 }
367 }
368 HyperliquidWsMessage::Bbo { data } => {
369 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
370 result.push(msg);
371 }
372 }
373 HyperliquidWsMessage::L2Book { data } => {
374 if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
375 result.push(msg);
376 }
377 }
378 HyperliquidWsMessage::Candle { data } => {
379 if let Some(msg) =
380 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
381 {
382 result.push(msg);
383 }
384 }
385 HyperliquidWsMessage::ActiveAssetCtx { data }
386 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
387 result.extend(Self::handle_asset_context(
388 &data,
389 instruments,
390 asset_context_subs,
391 mark_price_cache,
392 index_price_cache,
393 funding_rate_cache,
394 ts_init,
395 ));
396 }
397 HyperliquidWsMessage::Error { data } => {
398 tracing::warn!("Received error from Hyperliquid WebSocket: {data}");
399 }
400 _ => {}
402 }
403
404 result
405 }
406
407 fn handle_order_updates(
408 data: &[super::messages::WsOrderData],
409 instruments: &AHashMap<Ustr, InstrumentAny>,
410 account_id: AccountId,
411 ts_init: UnixNanos,
412 ) -> Option<NautilusWsMessage> {
413 let mut exec_reports = Vec::new();
414
415 for order_update in data {
416 if let Some(instrument) = instruments.get(&order_update.order.coin) {
417 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
418 Ok(report) => {
419 exec_reports.push(ExecutionReport::Order(report));
420 }
421 Err(e) => {
422 tracing::error!("Error parsing order update: {e}");
423 }
424 }
425 } else {
426 tracing::debug!("No instrument found for coin: {}", order_update.order.coin);
427 }
428 }
429
430 if !exec_reports.is_empty() {
431 Some(NautilusWsMessage::ExecutionReports(exec_reports))
432 } else {
433 None
434 }
435 }
436
437 fn handle_user_fills(
438 fills: &[super::messages::WsFillData],
439 instruments: &AHashMap<Ustr, InstrumentAny>,
440 account_id: AccountId,
441 ts_init: UnixNanos,
442 ) -> Option<NautilusWsMessage> {
443 let mut exec_reports = Vec::new();
444
445 for fill in fills {
446 if let Some(instrument) = instruments.get(&fill.coin) {
447 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
448 Ok(report) => {
449 exec_reports.push(ExecutionReport::Fill(report));
450 }
451 Err(e) => {
452 tracing::error!("Error parsing fill: {e}");
453 }
454 }
455 } else {
456 tracing::debug!("No instrument found for coin: {}", fill.coin);
457 }
458 }
459
460 if !exec_reports.is_empty() {
461 Some(NautilusWsMessage::ExecutionReports(exec_reports))
462 } else {
463 None
464 }
465 }
466
467 fn handle_trades(
468 data: &[super::messages::WsTradeData],
469 instruments: &AHashMap<Ustr, InstrumentAny>,
470 ts_init: UnixNanos,
471 ) -> Option<NautilusWsMessage> {
472 let mut trade_ticks = Vec::new();
473
474 for trade in data {
475 if let Some(instrument) = instruments.get(&trade.coin) {
476 match parse_ws_trade_tick(trade, instrument, ts_init) {
477 Ok(tick) => trade_ticks.push(tick),
478 Err(e) => {
479 tracing::error!("Error parsing trade tick: {e}");
480 }
481 }
482 } else {
483 tracing::debug!("No instrument found for coin: {}", trade.coin);
484 }
485 }
486
487 if !trade_ticks.is_empty() {
488 Some(NautilusWsMessage::Trades(trade_ticks))
489 } else {
490 None
491 }
492 }
493
494 fn handle_bbo(
495 data: &super::messages::WsBboData,
496 instruments: &AHashMap<Ustr, InstrumentAny>,
497 ts_init: UnixNanos,
498 ) -> Option<NautilusWsMessage> {
499 if let Some(instrument) = instruments.get(&data.coin) {
500 match parse_ws_quote_tick(data, instrument, ts_init) {
501 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
502 Err(e) => {
503 tracing::error!("Error parsing quote tick: {e}");
504 None
505 }
506 }
507 } else {
508 tracing::debug!("No instrument found for coin: {}", data.coin);
509 None
510 }
511 }
512
513 fn handle_l2_book(
514 data: &super::messages::WsBookData,
515 instruments: &AHashMap<Ustr, InstrumentAny>,
516 ts_init: UnixNanos,
517 ) -> Option<NautilusWsMessage> {
518 if let Some(instrument) = instruments.get(&data.coin) {
519 match parse_ws_order_book_deltas(data, instrument, ts_init) {
520 Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
521 Err(e) => {
522 tracing::error!("Error parsing order book deltas: {e}");
523 None
524 }
525 }
526 } else {
527 tracing::debug!("No instrument found for coin: {}", data.coin);
528 None
529 }
530 }
531
532 fn handle_candle(
533 data: &CandleData,
534 instruments: &AHashMap<Ustr, InstrumentAny>,
535 bar_types: &AHashMap<String, BarType>,
536 bar_cache: &mut AHashMap<String, CandleData>,
537 ts_init: UnixNanos,
538 ) -> Option<NautilusWsMessage> {
539 let key = format!("candle:{}:{}", data.s, data.i);
540
541 let mut closed_bar = None;
542 if let Some(cached) = bar_cache.get(&key) {
543 if cached.close_time != data.close_time {
545 tracing::debug!(
546 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
547 data.s,
548 cached.close_time,
549 data.close_time
550 );
551 closed_bar = Some(cached.clone());
552 }
553 }
554
555 bar_cache.insert(key.clone(), data.clone());
556
557 if let Some(closed_data) = closed_bar {
558 if let Some(bar_type) = bar_types.get(&key) {
559 if let Some(instrument) = instruments.get(&data.s) {
560 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
561 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
562 Err(e) => {
563 tracing::error!("Error parsing closed candle: {e}");
564 }
565 }
566 } else {
567 tracing::debug!("No instrument found for coin: {}", data.s);
568 }
569 } else {
570 tracing::debug!("No bar type found for key: {key}");
571 }
572 }
573
574 None
575 }
576
577 fn handle_asset_context(
578 data: &WsActiveAssetCtxData,
579 instruments: &AHashMap<Ustr, InstrumentAny>,
580 asset_context_subs: &AHashMap<Ustr, HashSet<AssetContextDataType>>,
581 mark_price_cache: &mut AHashMap<Ustr, String>,
582 index_price_cache: &mut AHashMap<Ustr, String>,
583 funding_rate_cache: &mut AHashMap<Ustr, String>,
584 ts_init: UnixNanos,
585 ) -> Vec<NautilusWsMessage> {
586 let mut result = Vec::new();
587
588 let coin = match data {
589 WsActiveAssetCtxData::Perp { coin, .. } => coin,
590 WsActiveAssetCtxData::Spot { coin, .. } => coin,
591 };
592
593 if let Some(instrument) = instruments.get(coin) {
594 let (mark_px, oracle_px, funding) = match data {
595 WsActiveAssetCtxData::Perp { ctx, .. } => (
596 &ctx.shared.mark_px,
597 Some(&ctx.oracle_px),
598 Some(&ctx.funding),
599 ),
600 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
601 };
602
603 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
604 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
605 let funding_changed =
606 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
607
608 let subscribed_types = asset_context_subs.get(coin);
609
610 if mark_changed || index_changed || funding_changed {
611 match parse_ws_asset_context(data, instrument, ts_init) {
612 Ok((mark_price, index_price, funding_rate)) => {
613 if mark_changed
614 && subscribed_types
615 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
616 {
617 mark_price_cache.insert(*coin, mark_px.clone());
618 result.push(NautilusWsMessage::MarkPrice(mark_price));
619 }
620 if index_changed
621 && subscribed_types
622 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
623 {
624 if let Some(px) = oracle_px {
625 index_price_cache.insert(*coin, px.clone());
626 }
627 if let Some(index) = index_price {
628 result.push(NautilusWsMessage::IndexPrice(index));
629 }
630 }
631 if funding_changed
632 && subscribed_types
633 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
634 {
635 if let Some(rate) = funding {
636 funding_rate_cache.insert(*coin, rate.clone());
637 }
638 if let Some(funding) = funding_rate {
639 result.push(NautilusWsMessage::FundingRate(funding));
640 }
641 }
642 }
643 Err(e) => {
644 tracing::error!("Error parsing asset context: {e}");
645 }
646 }
647 }
648 } else {
649 tracing::debug!("No instrument found for coin: {coin}");
650 }
651
652 result
653 }
654}
655
656fn subscription_to_key(sub: &SubscriptionRequest) -> String {
658 match sub {
659 SubscriptionRequest::AllMids { dex } => {
660 if let Some(dex_name) = dex {
661 format!("allMids:{dex_name}")
662 } else {
663 "allMids".to_string()
664 }
665 }
666 SubscriptionRequest::Notification { user } => format!("notification:{user}"),
667 SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
668 SubscriptionRequest::Candle { coin, interval } => {
669 format!("candle:{coin}:{}", interval.as_str())
670 }
671 SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
672 SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
673 SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
674 SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
675 SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
676 SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
677 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
678 format!("userNonFundingLedgerUpdates:{user}")
679 }
680 SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
681 SubscriptionRequest::ActiveSpotAssetCtx { coin } => format!("activeSpotAssetCtx:{coin}"),
682 SubscriptionRequest::ActiveAssetData { user, coin } => {
683 format!("activeAssetData:{user}:{coin}")
684 }
685 SubscriptionRequest::UserTwapSliceFills { user } => format!("userTwapSliceFills:{user}"),
686 SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
687 SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
688 }
689}
690
691pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
693 match error {
694 HyperliquidWsError::TungsteniteError(_) => true,
695 HyperliquidWsError::ClientError(msg) => {
696 let msg_lower = msg.to_lowercase();
697 msg_lower.contains("timeout")
698 || msg_lower.contains("timed out")
699 || msg_lower.contains("connection")
700 || msg_lower.contains("network")
701 }
702 _ => false,
703 }
704}
705
706pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
708 HyperliquidWsError::ClientError(msg)
709}