1use std::{
19 str::FromStr,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use ahash::AHashSet;
25use anyhow::Context;
26use async_trait::async_trait;
27use nautilus_common::{
28 clients::ExecutionClient,
29 live::{runner::get_exec_event_sender, runtime::get_runtime},
30 messages::{
31 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
32 execution::{
33 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
35 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36 },
37 },
38};
39use nautilus_core::{
40 MUTEX_POISONED, UUID4, UnixNanos,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{AccountType, OmsType, OrderStatus, OrderType},
47 identifiers::{AccountId, ClientId, ClientOrderId, Venue},
48 orders::{Order, any::OrderAny},
49 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
50 types::{AccountBalance, MarginBalance},
51};
52use tokio::task::JoinHandle;
53use ustr::Ustr;
54
55use crate::{
56 common::{
57 consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_FEE_ADDRESS, NAUTILUS_BUILDER_FEE_TENTHS_BP},
58 credential::Secrets,
59 parse::{
60 client_order_id_to_cancel_request_with_asset, extract_error_message,
61 is_response_successful, order_to_hyperliquid_request_with_asset,
62 parse_account_balances_and_margins,
63 },
64 },
65 config::HyperliquidExecClientConfig,
66 http::{
67 client::HyperliquidHttpClient,
68 models::{
69 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
70 HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest,
71 },
72 },
73 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
74};
75
76#[derive(Debug)]
77pub struct HyperliquidExecutionClient {
78 core: ExecutionClientCore,
79 clock: &'static AtomicTime,
80 config: HyperliquidExecClientConfig,
81 emitter: ExecutionEventEmitter,
82 http_client: HyperliquidHttpClient,
83 ws_client: HyperliquidWebSocketClient,
84 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
85 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
86}
87
88impl HyperliquidExecutionClient {
89 pub fn config(&self) -> &HyperliquidExecClientConfig {
91 &self.config
92 }
93
94 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
95 let instrument_id = order.instrument_id();
98 let symbol = instrument_id.symbol.as_str();
99 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
100 anyhow::bail!(
101 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
102 );
103 }
104
105 match order.order_type() {
107 OrderType::Market
108 | OrderType::Limit
109 | OrderType::StopMarket
110 | OrderType::StopLimit
111 | OrderType::MarketIfTouched
112 | OrderType::LimitIfTouched => {}
113 _ => anyhow::bail!(
114 "Unsupported order type for Hyperliquid: {:?}",
115 order.order_type()
116 ),
117 }
118
119 if matches!(
121 order.order_type(),
122 OrderType::StopMarket
123 | OrderType::StopLimit
124 | OrderType::MarketIfTouched
125 | OrderType::LimitIfTouched
126 ) && order.trigger_price().is_none()
127 {
128 anyhow::bail!(
129 "Conditional orders require a trigger price for Hyperliquid: {:?}",
130 order.order_type()
131 );
132 }
133
134 if matches!(
136 order.order_type(),
137 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138 ) && order.price().is_none()
139 {
140 anyhow::bail!(
141 "Limit orders require a limit price for Hyperliquid: {:?}",
142 order.order_type()
143 );
144 }
145
146 Ok(())
147 }
148
149 pub fn new(
155 core: ExecutionClientCore,
156 config: HyperliquidExecClientConfig,
157 ) -> anyhow::Result<Self> {
158 if !config.has_credentials() {
159 anyhow::bail!("Hyperliquid execution client requires private key");
160 }
161
162 let secrets = Secrets::from_json(&format!(
163 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164 config.private_key, config.is_testnet
165 ))
166 .context("failed to create secrets from private key")?;
167
168 let mut http_client = HyperliquidHttpClient::with_secrets(
169 &secrets,
170 Some(config.http_timeout_secs),
171 config.http_proxy_url.clone(),
172 )
173 .context("failed to create Hyperliquid HTTP client")?;
174
175 http_client.set_account_id(core.account_id);
176
177 let ws_client =
179 HyperliquidWebSocketClient::new(None, config.is_testnet, Some(core.account_id));
180
181 let clock = get_atomic_clock_realtime();
182 let emitter = ExecutionEventEmitter::new(
183 clock,
184 core.trader_id,
185 core.account_id,
186 AccountType::Margin,
187 None,
188 );
189
190 Ok(Self {
191 core,
192 clock,
193 config,
194 emitter,
195 http_client,
196 ws_client,
197 pending_tasks: Mutex::new(Vec::new()),
198 ws_stream_handle: Mutex::new(None),
199 })
200 }
201
202 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
203 if self.core.instruments_initialized() {
204 return Ok(());
205 }
206
207 let instruments = self
208 .http_client
209 .request_instruments()
210 .await
211 .context("failed to request Hyperliquid instruments")?;
212
213 if instruments.is_empty() {
214 log::warn!(
215 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
216 );
217 } else {
218 log::info!("Initialized {} instruments", instruments.len());
219
220 for instrument in &instruments {
221 self.http_client.cache_instrument(instrument.clone());
222 }
223 }
224
225 self.core.set_instruments_initialized();
226 Ok(())
227 }
228
229 async fn refresh_account_state(&self) -> anyhow::Result<()> {
230 let account_address = self.get_account_address()?;
231
232 let clearinghouse_state = self
233 .http_client
234 .info_clearinghouse_state(&account_address)
235 .await
236 .context("failed to fetch clearinghouse state")?;
237
238 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
240 .context("failed to deserialize clearinghouse state")?;
241
242 log::debug!(
243 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
244 state.cross_margin_summary,
245 state.asset_positions.len()
246 );
247
248 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
250 let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
251 .context("failed to parse account balances and margins")?;
252
253 let ts_event = self.clock.get_time_ns();
255 self.emitter
256 .emit_account_state(balances, margins, true, ts_event);
257
258 log::info!("Account state updated successfully");
259 } else {
260 log::warn!("No cross margin summary in clearinghouse state");
261 }
262
263 Ok(())
264 }
265
266 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
267 let account_id = self.core.account_id;
268
269 if self.core.cache().account(&account_id).is_some() {
270 log::info!("Account {account_id} registered");
271 return Ok(());
272 }
273
274 let start = Instant::now();
275 let timeout = Duration::from_secs_f64(timeout_secs);
276 let interval = Duration::from_millis(10);
277
278 loop {
279 tokio::time::sleep(interval).await;
280
281 if self.core.cache().account(&account_id).is_some() {
282 log::info!("Account {account_id} registered");
283 return Ok(());
284 }
285
286 if start.elapsed() >= timeout {
287 anyhow::bail!(
288 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
289 );
290 }
291 }
292 }
293
294 fn get_user_address(&self) -> anyhow::Result<String> {
295 self.http_client
296 .get_user_address()
297 .context("failed to get user address from HTTP client")
298 }
299
300 fn get_account_address(&self) -> anyhow::Result<String> {
301 match &self.config.vault_address {
302 Some(vault) => Ok(vault.clone()),
303 None => self.get_user_address(),
304 }
305 }
306
307 fn spawn_task<F>(&self, description: &'static str, fut: F)
308 where
309 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
310 {
311 let runtime = get_runtime();
312 let handle = runtime.spawn(async move {
313 if let Err(e) = fut.await {
314 log::warn!("{description} failed: {e:?}");
315 }
316 });
317
318 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
319 tasks.retain(|handle| !handle.is_finished());
320 tasks.push(handle);
321 }
322
323 fn abort_pending_tasks(&self) {
324 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
325 for handle in tasks.drain(..) {
326 handle.abort();
327 }
328 }
329}
330
331#[async_trait(?Send)]
332impl ExecutionClient for HyperliquidExecutionClient {
333 fn is_connected(&self) -> bool {
334 self.core.is_connected()
335 }
336
337 fn client_id(&self) -> ClientId {
338 self.core.client_id
339 }
340
341 fn account_id(&self) -> AccountId {
342 self.core.account_id
343 }
344
345 fn venue(&self) -> Venue {
346 *HYPERLIQUID_VENUE
347 }
348
349 fn oms_type(&self) -> OmsType {
350 self.core.oms_type
351 }
352
353 fn get_account(&self) -> Option<AccountAny> {
354 self.core.cache().account(&self.core.account_id).cloned()
355 }
356
357 fn generate_account_state(
358 &self,
359 balances: Vec<AccountBalance>,
360 margins: Vec<MarginBalance>,
361 reported: bool,
362 ts_event: UnixNanos,
363 ) -> anyhow::Result<()> {
364 self.emitter
365 .emit_account_state(balances, margins, reported, ts_event);
366 Ok(())
367 }
368
369 fn start(&mut self) -> anyhow::Result<()> {
370 if self.core.is_started() {
371 return Ok(());
372 }
373
374 let sender = get_exec_event_sender();
375 self.emitter.set_sender(sender);
376 self.core.set_started();
377
378 log::info!(
379 "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
380 self.core.client_id,
381 self.core.account_id,
382 self.config.is_testnet,
383 self.config.vault_address,
384 self.config.http_proxy_url,
385 self.config.ws_proxy_url,
386 );
387
388 Ok(())
389 }
390
391 fn stop(&mut self) -> anyhow::Result<()> {
392 if self.core.is_stopped() {
393 return Ok(());
394 }
395
396 log::info!("Stopping Hyperliquid execution client");
397
398 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
400 handle.abort();
401 }
402
403 self.abort_pending_tasks();
405
406 if self.core.is_connected() {
408 let runtime = get_runtime();
409 runtime.block_on(async {
410 if let Err(e) = self.ws_client.disconnect().await {
411 log::warn!("Error disconnecting WebSocket client: {e}");
412 }
413 });
414 }
415
416 self.core.set_disconnected();
417 self.core.set_stopped();
418
419 log::info!("Hyperliquid execution client stopped");
420 Ok(())
421 }
422
423 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
424 let order = self
425 .core
426 .cache()
427 .order(&cmd.client_order_id)
428 .cloned()
429 .ok_or_else(|| {
430 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
431 })?;
432
433 if order.is_closed() {
434 log::warn!("Cannot submit closed order {}", order.client_order_id());
435 return Ok(());
436 }
437
438 if let Err(e) = self.validate_order_submission(&order) {
439 self.emitter
440 .emit_order_denied(&order, &format!("Validation failed: {e}"));
441 return Err(e);
442 }
443
444 let http_client = self.http_client.clone();
445 let symbol = order.instrument_id().symbol.to_string();
446
447 let asset = match http_client.get_asset_index(&symbol) {
449 Some(a) => a,
450 None => {
451 self.emitter
452 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
453 return Ok(());
454 }
455 };
456
457 let hyperliquid_order = match order_to_hyperliquid_request_with_asset(&order, asset) {
459 Ok(req) => req,
460 Err(e) => {
461 self.emitter
462 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
463 return Ok(());
464 }
465 };
466
467 let cloid = Cloid::from_client_order_id(order.client_order_id());
470 self.ws_client
471 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
472
473 self.emitter.emit_order_submitted(&order);
474
475 let builder_fee = HyperliquidExecBuilderFee {
476 address: NAUTILUS_BUILDER_FEE_ADDRESS.to_string(),
477 fee_tenths_bp: NAUTILUS_BUILDER_FEE_TENTHS_BP,
478 };
479
480 let emitter = self.emitter.clone();
481 let clock = self.clock;
482 let ws_client = self.ws_client.clone();
483 let cloid_hex = Ustr::from(&cloid.to_hex());
484
485 self.spawn_task("submit_order", async move {
486 let action = HyperliquidExecAction::Order {
487 orders: vec![hyperliquid_order],
488 grouping: HyperliquidExecGrouping::Na,
489 builder: Some(builder_fee),
490 };
491
492 match http_client.post_action_exec(&action).await {
493 Ok(response) => {
494 if is_response_successful(&response) {
495 log::info!("Order submitted successfully: {response:?}");
496 } else {
497 let error_msg = extract_error_message(&response);
498 log::warn!("Order submission rejected by exchange: {error_msg}");
499 let ts = clock.get_time_ns();
500 emitter.emit_order_rejected(&order, &error_msg, ts, false);
501 ws_client.remove_cloid_mapping(&cloid_hex);
502 }
503 }
504 Err(e) => {
505 log::error!("Order submission HTTP request failed: {e}");
509 }
510 }
511
512 Ok(())
513 });
514
515 Ok(())
516 }
517
518 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
519 log::debug!(
520 "Submitting order list with {} orders",
521 cmd.order_list.client_order_ids.len()
522 );
523
524 let http_client = self.http_client.clone();
525
526 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
527
528 let mut valid_orders = Vec::new();
530 let mut hyperliquid_orders = Vec::new();
531
532 for order in &orders {
533 let symbol = order.instrument_id().symbol.to_string();
534 let asset = match http_client.get_asset_index(&symbol) {
535 Some(a) => a,
536 None => {
537 self.emitter
538 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
539 continue;
540 }
541 };
542
543 match order_to_hyperliquid_request_with_asset(order, asset) {
544 Ok(req) => {
545 hyperliquid_orders.push(req);
546 valid_orders.push(order.clone());
547 }
548 Err(e) => {
549 self.emitter
550 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
551 }
552 }
553 }
554
555 if valid_orders.is_empty() {
556 log::warn!("No valid orders to submit in order list");
557 return Ok(());
558 }
559
560 for order in &valid_orders {
561 let cloid = Cloid::from_client_order_id(order.client_order_id());
562 self.ws_client
563 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
564 self.emitter.emit_order_submitted(order);
565 }
566
567 let builder_fee = HyperliquidExecBuilderFee {
568 address: NAUTILUS_BUILDER_FEE_ADDRESS.to_string(),
569 fee_tenths_bp: NAUTILUS_BUILDER_FEE_TENTHS_BP,
570 };
571
572 let emitter = self.emitter.clone();
573 let clock = self.clock;
574 let ws_client = self.ws_client.clone();
575 let cloid_hexes: Vec<Ustr> = valid_orders
576 .iter()
577 .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
578 .collect();
579
580 self.spawn_task("submit_order_list", async move {
581 let action = HyperliquidExecAction::Order {
582 orders: hyperliquid_orders,
583 grouping: HyperliquidExecGrouping::Na,
584 builder: Some(builder_fee),
585 };
586 match http_client.post_action_exec(&action).await {
587 Ok(response) => {
588 if is_response_successful(&response) {
589 log::info!("Order list submitted successfully: {response:?}");
590 } else {
591 let error_msg = extract_error_message(&response);
593 log::warn!("Order list submission rejected by exchange: {error_msg}");
594 let ts = clock.get_time_ns();
595 for order in &valid_orders {
596 emitter.emit_order_rejected(order, &error_msg, ts, false);
597 }
598 for cloid_hex in &cloid_hexes {
599 ws_client.remove_cloid_mapping(cloid_hex);
600 }
601 }
602 }
603 Err(e) => {
604 log::error!("Order list submission HTTP request failed: {e}");
608 }
609 }
610
611 Ok(())
612 });
613
614 Ok(())
615 }
616
617 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
618 log::debug!("Modifying order: {cmd:?}");
619
620 let venue_order_id = match cmd.venue_order_id {
622 Some(id) => id,
623 None => {
624 log::warn!("Cannot modify order: venue_order_id is None");
625 return Ok(());
626 }
627 };
628
629 let oid: u64 = match venue_order_id.as_str().parse() {
630 Ok(id) => id,
631 Err(e) => {
632 log::warn!("Failed to parse venue_order_id '{venue_order_id}' as u64: {e}");
633 return Ok(());
634 }
635 };
636
637 let http_client = self.http_client.clone();
638 let price = cmd.price;
639 let quantity = cmd.quantity;
640 let symbol = cmd.instrument_id.symbol.to_string();
641
642 self.spawn_task("modify_order", async move {
643 let asset = match http_client.get_asset_index(&symbol) {
644 Some(a) => a,
645 None => {
646 log::warn!(
647 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
648 );
649 return Ok(());
650 }
651 };
652
653 let modify_request = HyperliquidExecModifyOrderRequest {
655 asset,
656 oid,
657 price: price.map(|p| (*p).into()),
658 size: quantity.map(|q| (*q).into()),
659 reduce_only: None,
660 kind: None,
661 };
662
663 let action = HyperliquidExecAction::Modify {
664 modify: modify_request,
665 };
666
667 match http_client.post_action_exec(&action).await {
668 Ok(response) => {
669 if is_response_successful(&response) {
670 log::info!("Order modified successfully: {response:?}");
671 } else {
673 let error_msg = extract_error_message(&response);
674 log::warn!("Order modification rejected by exchange: {error_msg}");
675 }
677 }
678 Err(e) => {
679 log::warn!("Order modification HTTP request failed: {e}");
680 }
682 }
683
684 Ok(())
685 });
686
687 Ok(())
688 }
689
690 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
691 log::debug!("Cancelling order: {cmd:?}");
692
693 let http_client = self.http_client.clone();
694 let client_order_id = cmd.client_order_id.to_string();
695 let symbol = cmd.instrument_id.symbol.to_string();
696
697 self.spawn_task("cancel_order", async move {
698 let asset = match http_client.get_asset_index(&symbol) {
699 Some(a) => a,
700 None => {
701 log::warn!(
702 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
703 );
704 return Ok(());
705 }
706 };
707
708 let cancel_request =
709 client_order_id_to_cancel_request_with_asset(&client_order_id, asset);
710 let action = HyperliquidExecAction::CancelByCloid {
711 cancels: vec![cancel_request],
712 };
713
714 match http_client.post_action_exec(&action).await {
715 Ok(response) => {
716 if is_response_successful(&response) {
717 log::info!("Order cancelled successfully: {response:?}");
718 } else {
719 let error_msg = extract_error_message(&response);
720 log::warn!("Order cancellation rejected by exchange: {error_msg}");
721 }
722 }
723 Err(e) => {
724 log::warn!("Order cancellation HTTP request failed: {e}");
725 }
726 }
727
728 Ok(())
729 });
730
731 Ok(())
732 }
733
734 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
735 log::debug!("Cancelling all orders: {cmd:?}");
736
737 let cache = self.core.cache();
738 let open_orders = cache.orders_open(
739 Some(&self.core.venue),
740 Some(&cmd.instrument_id),
741 None,
742 None,
743 Some(cmd.order_side),
744 );
745
746 if open_orders.is_empty() {
747 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
748 return Ok(());
749 }
750
751 let symbol = cmd.instrument_id.symbol.to_string();
752 let client_order_ids: Vec<String> = open_orders
753 .iter()
754 .map(|o| o.client_order_id().to_string())
755 .collect();
756
757 let http_client = self.http_client.clone();
758
759 self.spawn_task("cancel_all_orders", async move {
760 let asset = match http_client.get_asset_index(&symbol) {
761 Some(a) => a,
762 None => {
763 log::warn!(
764 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
765 );
766 return Ok(());
767 }
768 };
769
770 let cancel_requests: Vec<_> = client_order_ids
771 .iter()
772 .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
773 .collect();
774
775 if cancel_requests.is_empty() {
776 log::debug!("No valid cancel requests to send");
777 return Ok(());
778 }
779
780 let action = HyperliquidExecAction::CancelByCloid {
781 cancels: cancel_requests,
782 };
783 if let Err(e) = http_client.post_action_exec(&action).await {
784 log::warn!("Failed to send cancel all orders request: {e}");
785 }
786
787 Ok(())
788 });
789
790 Ok(())
791 }
792
793 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
794 log::debug!("Batch cancelling orders: {cmd:?}");
795
796 if cmd.cancels.is_empty() {
797 log::debug!("No orders to cancel in batch");
798 return Ok(());
799 }
800
801 let cancel_info: Vec<(String, String)> = cmd
802 .cancels
803 .iter()
804 .map(|c| {
805 (
806 c.client_order_id.to_string(),
807 c.instrument_id.symbol.to_string(),
808 )
809 })
810 .collect();
811
812 let http_client = self.http_client.clone();
813
814 self.spawn_task("batch_cancel_orders", async move {
815 let mut cancel_requests = Vec::new();
816
817 for (client_order_id, symbol) in &cancel_info {
818 let asset = match http_client.get_asset_index(symbol) {
819 Some(a) => a,
820 None => {
821 log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
822 continue;
823 }
824 };
825 cancel_requests.push(client_order_id_to_cancel_request_with_asset(
826 client_order_id,
827 asset,
828 ));
829 }
830
831 if cancel_requests.is_empty() {
832 log::warn!("No valid cancel requests in batch");
833 return Ok(());
834 }
835
836 let action = HyperliquidExecAction::CancelByCloid {
837 cancels: cancel_requests,
838 };
839 if let Err(e) = http_client.post_action_exec(&action).await {
840 log::warn!("Failed to send batch cancel orders request: {e}");
841 }
842
843 Ok(())
844 });
845
846 Ok(())
847 }
848
849 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
850 log::debug!("Querying account: {cmd:?}");
851
852 let runtime = get_runtime();
854 runtime.block_on(async {
855 if let Err(e) = self.refresh_account_state().await {
856 log::warn!("Failed to query account state: {e}");
857 }
858 });
859
860 Ok(())
861 }
862
863 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
864 log::debug!("Querying order: {cmd:?}");
865
866 let cache = self.core.cache();
868 let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
869
870 let venue_order_id = match venue_order_id {
871 Some(oid) => *oid,
872 None => {
873 log::warn!(
874 "No venue order ID found for client order {}",
875 cmd.client_order_id
876 );
877 return Ok(());
878 }
879 };
880 drop(cache);
881
882 let oid = match u64::from_str(venue_order_id.as_ref()) {
884 Ok(id) => id,
885 Err(e) => {
886 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
887 return Ok(());
888 }
889 };
890
891 let account_address = self.get_account_address()?;
892
893 let http_client = self.http_client.clone();
897 let runtime = get_runtime();
898 runtime.spawn(async move {
899 match http_client.info_order_status(&account_address, oid).await {
900 Ok(status) => {
901 log::debug!("Order status for oid {oid}: {status:?}");
902 }
903 Err(e) => {
904 log::warn!("Failed to query order status for oid {oid}: {e}");
905 }
906 }
907 });
908
909 Ok(())
910 }
911
912 async fn connect(&mut self) -> anyhow::Result<()> {
913 if self.core.is_connected() {
914 return Ok(());
915 }
916
917 log::info!("Connecting Hyperliquid execution client");
918
919 self.ensure_instruments_initialized_async().await?;
921
922 self.start_ws_stream().await?;
924
925 self.refresh_account_state().await?;
927 self.await_account_registered(30.0).await?;
928
929 self.core.set_connected();
930
931 log::info!("Connected: client_id={}", self.core.client_id);
932 Ok(())
933 }
934
935 async fn disconnect(&mut self) -> anyhow::Result<()> {
936 if self.core.is_disconnected() {
937 return Ok(());
938 }
939
940 log::info!("Disconnecting Hyperliquid execution client");
941
942 self.ws_client.disconnect().await?;
944
945 self.abort_pending_tasks();
947
948 self.core.set_disconnected();
949
950 log::info!("Disconnected: client_id={}", self.core.client_id);
951 Ok(())
952 }
953
954 async fn generate_order_status_report(
955 &self,
956 _cmd: &GenerateOrderStatusReport,
957 ) -> anyhow::Result<Option<OrderStatusReport>> {
958 log::warn!("generate_order_status_report not yet fully implemented");
962 Ok(None)
963 }
964
965 async fn generate_order_status_reports(
966 &self,
967 cmd: &GenerateOrderStatusReports,
968 ) -> anyhow::Result<Vec<OrderStatusReport>> {
969 let account_address = self.get_account_address()?;
970
971 let reports = self
972 .http_client
973 .request_order_status_reports(&account_address, cmd.instrument_id)
974 .await
975 .context("failed to generate order status reports")?;
976
977 let reports = if cmd.open_only {
979 reports
980 .into_iter()
981 .filter(|r| r.order_status.is_open())
982 .collect()
983 } else {
984 reports
985 };
986
987 let reports = match (cmd.start, cmd.end) {
989 (Some(start), Some(end)) => reports
990 .into_iter()
991 .filter(|r| r.ts_last >= start && r.ts_last <= end)
992 .collect(),
993 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
994 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
995 (None, None) => reports,
996 };
997
998 log::info!("Generated {} order status reports", reports.len());
999 Ok(reports)
1000 }
1001
1002 async fn generate_fill_reports(
1003 &self,
1004 cmd: GenerateFillReports,
1005 ) -> anyhow::Result<Vec<FillReport>> {
1006 let account_address = self.get_account_address()?;
1007
1008 let reports = self
1009 .http_client
1010 .request_fill_reports(&account_address, cmd.instrument_id)
1011 .await
1012 .context("failed to generate fill reports")?;
1013
1014 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1016 reports
1017 .into_iter()
1018 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1019 .collect()
1020 } else if let Some(start) = cmd.start {
1021 reports
1022 .into_iter()
1023 .filter(|r| r.ts_event >= start)
1024 .collect()
1025 } else if let Some(end) = cmd.end {
1026 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1027 } else {
1028 reports
1029 };
1030
1031 log::info!("Generated {} fill reports", reports.len());
1032 Ok(reports)
1033 }
1034
1035 async fn generate_position_status_reports(
1036 &self,
1037 cmd: &GeneratePositionStatusReports,
1038 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1039 let account_address = self.get_account_address()?;
1040
1041 let reports = self
1042 .http_client
1043 .request_position_status_reports(&account_address, cmd.instrument_id)
1044 .await
1045 .context("failed to generate position status reports")?;
1046
1047 log::info!("Generated {} position status reports", reports.len());
1048 Ok(reports)
1049 }
1050
1051 async fn generate_mass_status(
1052 &self,
1053 lookback_mins: Option<u64>,
1054 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1055 let ts_init = self.clock.get_time_ns();
1056
1057 let order_cmd = GenerateOrderStatusReports::new(
1058 UUID4::new(),
1059 ts_init,
1060 true, None,
1062 None,
1063 None,
1064 None,
1065 None,
1066 );
1067 let fill_cmd =
1068 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1069 let position_cmd =
1070 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1071
1072 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1073 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1074 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1075
1076 if let Some(mins) = lookback_mins {
1079 let cutoff_ns = ts_init
1080 .as_u64()
1081 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1082 let cutoff = UnixNanos::from(cutoff_ns);
1083
1084 fill_reports.retain(|r| r.ts_event >= cutoff);
1085 }
1086
1087 let mut mass_status = ExecutionMassStatus::new(
1088 self.core.client_id,
1089 self.core.account_id,
1090 self.core.venue,
1091 ts_init,
1092 None,
1093 );
1094 mass_status.add_order_reports(order_reports);
1095 mass_status.add_fill_reports(fill_reports);
1096 mass_status.add_position_reports(position_reports);
1097
1098 log::info!(
1099 "Generated mass status: {} orders, {} fills, {} positions",
1100 mass_status.order_reports().len(),
1101 mass_status.fill_reports().len(),
1102 mass_status.position_reports().len(),
1103 );
1104
1105 Ok(Some(mass_status))
1106 }
1107}
1108
1109impl HyperliquidExecutionClient {
1110 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1111 {
1112 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1113 if handle_guard.is_some() {
1114 return Ok(());
1115 }
1116 }
1117
1118 let user_address = self.get_user_address()?;
1119
1120 let subscription_address = self
1123 .config
1124 .vault_address
1125 .as_ref()
1126 .unwrap_or(&user_address)
1127 .clone();
1128
1129 let mut ws_client = self.ws_client.clone();
1130
1131 let instruments = self
1132 .http_client
1133 .request_instruments()
1134 .await
1135 .unwrap_or_default();
1136
1137 for instrument in instruments {
1138 ws_client.cache_instrument(instrument);
1139 }
1140
1141 ws_client.connect().await?;
1143 ws_client
1144 .subscribe_order_updates(&subscription_address)
1145 .await?;
1146 ws_client
1147 .subscribe_user_events(&subscription_address)
1148 .await?;
1149 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1150
1151 let runtime = get_runtime();
1152 let handle = runtime.spawn(async move {
1153 let mut pending_filled: AHashSet<ClientOrderId> = AHashSet::new();
1155
1156 loop {
1157 let event = ws_client.next_event().await;
1158
1159 match event {
1160 Some(msg) => {
1161 match msg {
1162 NautilusWsMessage::ExecutionReports(reports) => {
1163 let mut terminal_ids: Vec<ClientOrderId> = Vec::new();
1164 let mut filled_ids: Vec<ClientOrderId> = Vec::new();
1165 let mut fill_ids: Vec<ClientOrderId> = Vec::new();
1166
1167 for report in &reports {
1168 match report {
1169 ExecutionReport::Order(order_report) => {
1170 if let Some(id) = order_report.client_order_id
1171 && !order_report.order_status.is_open()
1172 {
1173 if order_report.order_status
1174 == OrderStatus::Filled
1175 {
1176 filled_ids.push(id);
1177 } else {
1178 terminal_ids.push(id);
1179 }
1180 }
1181 }
1182 ExecutionReport::Fill(fill_report) => {
1183 if let Some(id) = fill_report.client_order_id {
1184 fill_ids.push(id);
1185 }
1186 }
1187 }
1188 }
1189
1190 for report in reports {
1191 dispatch_execution_report(report);
1192 }
1193
1194 for id in terminal_ids {
1195 let cloid = Cloid::from_client_order_id(id);
1196 ws_client.remove_cloid_mapping(&Ustr::from(
1197 &cloid.to_hex(),
1198 ));
1199 }
1200
1201 for id in filled_ids {
1203 pending_filled.insert(id);
1204 }
1205
1206 for id in fill_ids {
1208 if pending_filled.remove(&id) {
1209 let cloid = Cloid::from_client_order_id(id);
1210 ws_client.remove_cloid_mapping(&Ustr::from(
1211 &cloid.to_hex(),
1212 ));
1213 }
1214 }
1215 }
1216 NautilusWsMessage::Reconnected => {
1217 log::info!("WebSocket reconnected, resubscribing to user channels");
1218
1219 if let Err(e) = ws_client
1220 .subscribe_order_updates(&subscription_address)
1221 .await
1222 {
1223 log::error!(
1224 "Failed to resubscribe to order updates after reconnect: {e}"
1225 );
1226 }
1227
1228 if let Err(e) = ws_client
1229 .subscribe_user_events(&subscription_address)
1230 .await
1231 {
1232 log::error!(
1233 "Failed to resubscribe to user events after reconnect: {e}"
1234 );
1235 }
1236
1237 log::info!("Resubscribed to execution channels");
1238 }
1239 NautilusWsMessage::Error(e) => {
1240 log::error!("WebSocket error: {e}");
1241 }
1242 NautilusWsMessage::Trades(_)
1244 | NautilusWsMessage::Quote(_)
1245 | NautilusWsMessage::Deltas(_)
1246 | NautilusWsMessage::Candle(_)
1247 | NautilusWsMessage::MarkPrice(_)
1248 | NautilusWsMessage::IndexPrice(_)
1249 | NautilusWsMessage::FundingRate(_) => {}
1250 }
1251 }
1252 None => {
1253 log::warn!("WebSocket next_event returned None");
1254 break;
1255 }
1256 }
1257 }
1258 });
1259
1260 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1261 log::info!("Hyperliquid WebSocket execution stream started");
1262 Ok(())
1263 }
1264}
1265
1266fn dispatch_execution_report(report: ExecutionReport) {
1267 let sender = get_exec_event_sender();
1268 match report {
1269 ExecutionReport::Order(order_report) => {
1270 let exec_report = NautilusExecutionReport::Order(Box::new(order_report));
1271 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1272 log::warn!("Failed to send order status report: {e}");
1273 }
1274 }
1275 ExecutionReport::Fill(fill_report) => {
1276 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1277 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1278 log::warn!("Failed to send fill report: {e}");
1279 }
1280 }
1281 }
1282}