1use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23 clients::ExecutionClient,
24 live::{runner::get_exec_event_sender, runtime::get_runtime},
25 messages::{
26 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
27 execution::{
28 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
29 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
30 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
31 },
32 },
33};
34use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_live::ExecutionClientCore;
36use nautilus_model::{
37 accounts::AccountAny,
38 enums::{OmsType, OrderType},
39 identifiers::{AccountId, ClientId, Venue},
40 orders::{Order, any::OrderAny},
41 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance},
43};
44use serde_json;
45use tokio::task::JoinHandle;
46
47use crate::{
48 common::{
49 HyperliquidProductType,
50 consts::HYPERLIQUID_VENUE,
51 credential::Secrets,
52 parse::{
53 client_order_id_to_cancel_request, extract_error_message, is_response_successful,
54 order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
55 },
56 },
57 config::HyperliquidExecClientConfig,
58 http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
59 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
60};
61
62#[derive(Debug)]
63pub struct HyperliquidExecutionClient {
64 core: ExecutionClientCore,
65 config: HyperliquidExecClientConfig,
66 http_client: HyperliquidHttpClient,
67 ws_client: HyperliquidWebSocketClient,
68 started: bool,
69 connected: bool,
70 instruments_initialized: bool,
71 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
72 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
73}
74
75impl HyperliquidExecutionClient {
76 pub fn config(&self) -> &HyperliquidExecClientConfig {
78 &self.config
79 }
80
81 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
96 let instrument_id = order.instrument_id();
99 let symbol = instrument_id.symbol.as_str();
100 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
101 anyhow::bail!(
102 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
103 );
104 }
105
106 match order.order_type() {
108 OrderType::Market
109 | OrderType::Limit
110 | OrderType::StopMarket
111 | OrderType::StopLimit
112 | OrderType::MarketIfTouched
113 | OrderType::LimitIfTouched => {}
114 _ => anyhow::bail!(
115 "Unsupported order type for Hyperliquid: {:?}",
116 order.order_type()
117 ),
118 }
119
120 if matches!(
122 order.order_type(),
123 OrderType::StopMarket
124 | OrderType::StopLimit
125 | OrderType::MarketIfTouched
126 | OrderType::LimitIfTouched
127 ) && order.trigger_price().is_none()
128 {
129 anyhow::bail!(
130 "Conditional orders require a trigger price for Hyperliquid: {:?}",
131 order.order_type()
132 );
133 }
134
135 if matches!(
137 order.order_type(),
138 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
139 ) && order.price().is_none()
140 {
141 anyhow::bail!(
142 "Limit orders require a limit price for Hyperliquid: {:?}",
143 order.order_type()
144 );
145 }
146
147 Ok(())
148 }
149
150 pub fn new(
156 core: ExecutionClientCore,
157 config: HyperliquidExecClientConfig,
158 ) -> anyhow::Result<Self> {
159 if !config.has_credentials() {
160 anyhow::bail!("Hyperliquid execution client requires private key");
161 }
162
163 let secrets = Secrets::from_json(&format!(
164 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
165 config.private_key, config.is_testnet
166 ))
167 .context("failed to create secrets from private key")?;
168
169 let http_client = HyperliquidHttpClient::with_credentials(
170 &secrets,
171 Some(config.http_timeout_secs),
172 config.http_proxy_url.clone(),
173 )
174 .context("failed to create Hyperliquid HTTP client")?;
175
176 let ws_client = HyperliquidWebSocketClient::new(
180 None,
181 config.is_testnet,
182 HyperliquidProductType::Perp,
183 Some(core.account_id),
184 );
185
186 Ok(Self {
187 core,
188 config,
189 http_client,
190 ws_client,
191 started: false,
192 connected: false,
193 instruments_initialized: false,
194 pending_tasks: Mutex::new(Vec::new()),
195 ws_stream_handle: Mutex::new(None),
196 })
197 }
198
199 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
200 if self.instruments_initialized {
201 return Ok(());
202 }
203
204 let instruments = self
205 .http_client
206 .request_instruments()
207 .await
208 .context("failed to request Hyperliquid instruments")?;
209
210 if instruments.is_empty() {
211 log::warn!(
212 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
213 );
214 } else {
215 log::info!("Initialized {} instruments", instruments.len());
216
217 for instrument in &instruments {
218 self.http_client.cache_instrument(instrument.clone());
219 }
220 }
221
222 self.instruments_initialized = true;
223 Ok(())
224 }
225
226 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
227 if self.instruments_initialized {
228 return Ok(());
229 }
230
231 let runtime = get_runtime();
232 runtime.block_on(self.ensure_instruments_initialized_async())
233 }
234
235 async fn refresh_account_state(&self) -> anyhow::Result<()> {
236 let user_address = self.get_user_address()?;
239
240 let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
242
243 let clearinghouse_state = self
245 .http_client
246 .info_clearinghouse_state(account_address)
247 .await
248 .context("failed to fetch clearinghouse state")?;
249
250 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
252 .context("failed to deserialize clearinghouse state")?;
253
254 log::debug!(
255 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
256 state.cross_margin_summary,
257 state.asset_positions.len()
258 );
259
260 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
262 let (balances, margins) =
263 crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
264 .context("failed to parse account balances and margins")?;
265
266 let ts_event = if let Some(time_ms) = state.time {
267 nautilus_core::UnixNanos::from(time_ms * 1_000_000)
268 } else {
269 nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
270 };
271
272 self.core.generate_account_state(
274 balances, margins, true, ts_event,
276 )?;
277
278 log::info!("Account state updated successfully");
279 } else {
280 log::warn!("No cross margin summary in clearinghouse state");
281 }
282
283 Ok(())
284 }
285
286 fn get_user_address(&self) -> anyhow::Result<String> {
287 let address = self
288 .http_client
289 .get_user_address()
290 .context("failed to get user address from HTTP client")?;
291
292 Ok(address)
293 }
294
295 fn spawn_task<F>(&self, description: &'static str, fut: F)
296 where
297 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
298 {
299 let runtime = get_runtime();
300 let handle = runtime.spawn(async move {
301 if let Err(e) = fut.await {
302 log::warn!("{description} failed: {e:?}");
303 }
304 });
305
306 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
307 tasks.retain(|handle| !handle.is_finished());
308 tasks.push(handle);
309 }
310
311 fn abort_pending_tasks(&self) {
312 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
313 for handle in tasks.drain(..) {
314 handle.abort();
315 }
316 }
317
318 fn update_account_state(&self) -> anyhow::Result<()> {
319 let runtime = get_runtime();
320 runtime.block_on(self.refresh_account_state())
321 }
322}
323
324#[async_trait(?Send)]
325impl ExecutionClient for HyperliquidExecutionClient {
326 fn is_connected(&self) -> bool {
327 self.connected
328 }
329
330 fn client_id(&self) -> ClientId {
331 self.core.client_id
332 }
333
334 fn account_id(&self) -> AccountId {
335 self.core.account_id
336 }
337
338 fn venue(&self) -> Venue {
339 *HYPERLIQUID_VENUE
340 }
341
342 fn oms_type(&self) -> OmsType {
343 self.core.oms_type
344 }
345
346 fn get_account(&self) -> Option<AccountAny> {
347 self.core.get_account()
348 }
349
350 fn generate_account_state(
351 &self,
352 balances: Vec<AccountBalance>,
353 margins: Vec<MarginBalance>,
354 reported: bool,
355 ts_event: UnixNanos,
356 ) -> anyhow::Result<()> {
357 self.core
358 .generate_account_state(balances, margins, reported, ts_event)
359 }
360
361 fn start(&mut self) -> anyhow::Result<()> {
362 if self.started {
363 return Ok(());
364 }
365
366 log::info!(
367 "Starting Hyperliquid execution client: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
368 self.core.client_id,
369 self.core.account_id,
370 self.config.is_testnet,
371 self.config.vault_address,
372 self.config.http_proxy_url,
373 self.config.ws_proxy_url,
374 );
375
376 self.ensure_instruments_initialized()?;
378
379 if let Err(e) = self.update_account_state() {
381 log::warn!("Failed to initialize account state: {e}");
382 }
383
384 self.connected = true;
385 self.started = true;
386
387 if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
389 log::warn!("Failed to start WebSocket stream: {e}");
390 }
391
392 log::info!("Hyperliquid execution client started");
393 Ok(())
394 }
395 fn stop(&mut self) -> anyhow::Result<()> {
396 if !self.started {
397 return Ok(());
398 }
399
400 log::info!("Stopping Hyperliquid execution client");
401
402 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
404 handle.abort();
405 }
406
407 self.abort_pending_tasks();
409
410 if self.connected {
412 let runtime = get_runtime();
413 runtime.block_on(async {
414 if let Err(e) = self.ws_client.disconnect().await {
415 log::warn!("Error disconnecting WebSocket client: {e}");
416 }
417 });
418 }
419
420 self.connected = false;
421 self.started = false;
422
423 log::info!("Hyperliquid execution client stopped");
424 Ok(())
425 }
426
427 fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
428 let order = &command.order;
429
430 if order.is_closed() {
431 log::warn!("Cannot submit closed order {}", order.client_order_id());
432 return Ok(());
433 }
434
435 if let Err(e) = self.validate_order_submission(order) {
436 self.core.generate_order_rejected(
437 order.strategy_id(),
438 order.instrument_id(),
439 order.client_order_id(),
440 &format!("validation-error: {e}"),
441 command.ts_init,
442 false,
443 );
444 return Err(e);
445 }
446
447 self.core.generate_order_submitted(
448 order.strategy_id(),
449 order.instrument_id(),
450 order.client_order_id(),
451 command.ts_init,
452 );
453
454 let http_client = self.http_client.clone();
455 let order_clone = order.clone();
456
457 self.spawn_task("submit_order", async move {
458 match order_any_to_hyperliquid_request(&order_clone) {
459 Ok(hyperliquid_order) => {
460 let action = ExchangeAction::order(vec![hyperliquid_order]);
462
463 match http_client.post_action(&action).await {
464 Ok(response) => {
465 if is_response_successful(&response) {
466 log::info!("Order submitted successfully: {response:?}");
467 } else {
470 let error_msg = extract_error_message(&response);
471 log::warn!("Order submission rejected by exchange: {error_msg}");
472 }
474 }
475 Err(e) => {
476 log::warn!("Order submission HTTP request failed: {e}");
477 }
479 }
480 }
481 Err(e) => {
482 log::warn!("Failed to convert order to Hyperliquid format: {e}");
483 }
485 }
486
487 Ok(())
488 });
489
490 Ok(())
491 }
492
493 fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
494 log::debug!(
495 "Submitting order list with {} orders",
496 command.order_list.orders.len()
497 );
498
499 let http_client = self.http_client.clone();
500 let orders: Vec<OrderAny> = command.order_list.orders.clone();
501
502 for order in &orders {
504 self.core.generate_order_submitted(
505 order.strategy_id(),
506 order.instrument_id(),
507 order.client_order_id(),
508 command.ts_init,
509 );
510 }
511
512 self.spawn_task("submit_order_list", async move {
513 let order_refs: Vec<&OrderAny> = orders.iter().collect();
515 match orders_to_hyperliquid_requests(&order_refs) {
516 Ok(hyperliquid_orders) => {
517 let action = ExchangeAction::order(hyperliquid_orders);
519 match http_client.post_action(&action).await {
520 Ok(response) => {
521 if is_response_successful(&response) {
522 log::info!("Order list submitted successfully: {response:?}");
523 } else {
525 let error_msg = extract_error_message(&response);
526 log::warn!(
527 "Order list submission rejected by exchange: {error_msg}"
528 );
529 }
531 }
532 Err(e) => {
533 log::warn!("Order list submission HTTP request failed: {e}");
534 }
536 }
537 }
538 Err(e) => {
539 log::warn!("Failed to convert order list to Hyperliquid format: {e}");
540 }
541 }
542
543 Ok(())
544 });
545
546 Ok(())
547 }
548
549 fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
550 log::debug!("Modifying order: {command:?}");
551
552 let venue_order_id = match command.venue_order_id {
554 Some(id) => id,
555 None => {
556 log::warn!("Cannot modify order: venue_order_id is None");
557 return Ok(());
558 }
559 };
560
561 let oid: u64 = match venue_order_id.as_str().parse() {
562 Ok(id) => id,
563 Err(e) => {
564 log::warn!("Failed to parse venue_order_id '{venue_order_id}' as u64: {e}");
565 return Ok(());
566 }
567 };
568
569 let http_client = self.http_client.clone();
570 let price = command.price;
571 let quantity = command.quantity;
572 let symbol = command.instrument_id.symbol.inner();
573
574 self.spawn_task("modify_order", async move {
575 use crate::{
576 common::parse::extract_asset_id_from_symbol,
577 http::models::HyperliquidExecModifyOrderRequest,
578 };
579
580 let asset = match extract_asset_id_from_symbol(&symbol) {
582 Ok(asset) => asset,
583 Err(e) => {
584 log::warn!("Failed to extract asset ID from symbol {symbol}: {e}");
585 return Ok(());
586 }
587 };
588
589 let modify_request = HyperliquidExecModifyOrderRequest {
591 asset,
592 oid,
593 price: price.map(|p| (*p).into()),
594 size: quantity.map(|q| (*q).into()),
595 reduce_only: None,
596 kind: None,
597 };
598
599 let action = ExchangeAction::modify(oid, modify_request);
600
601 match http_client.post_action(&action).await {
602 Ok(response) => {
603 if is_response_successful(&response) {
604 log::info!("Order modified successfully: {response:?}");
605 } else {
607 let error_msg = extract_error_message(&response);
608 log::warn!("Order modification rejected by exchange: {error_msg}");
609 }
611 }
612 Err(e) => {
613 log::warn!("Order modification HTTP request failed: {e}");
614 }
616 }
617
618 Ok(())
619 });
620
621 Ok(())
622 }
623
624 fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
625 log::debug!("Cancelling order: {command:?}");
626
627 let http_client = self.http_client.clone();
628 let client_order_id = command.client_order_id.inner();
629 let symbol = command.instrument_id.symbol.inner();
630
631 self.spawn_task("cancel_order", async move {
632 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
633 Ok(cancel_request) => {
634 let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
636 match http_client.post_action(&action).await {
637 Ok(response) => {
638 if is_response_successful(&response) {
639 log::info!("Order cancelled successfully: {response:?}");
640 } else {
643 let error_msg = extract_error_message(&response);
644 log::warn!("Order cancellation rejected by exchange: {error_msg}");
645 }
647 }
648 Err(e) => {
649 log::warn!("Order cancellation HTTP request failed: {e}");
650 }
652 }
653 }
654 Err(e) => {
655 log::warn!("Failed to convert order to Hyperliquid cancel format: {e:?}");
656 }
657 }
658
659 Ok(())
660 });
661
662 Ok(())
663 }
664
665 fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
666 log::debug!("Cancelling all orders: {command:?}");
667
668 let cache = self.core.cache().borrow();
670 let open_orders = cache.orders_open(
671 Some(&self.core.venue),
672 Some(&command.instrument_id),
673 None,
674 Some(command.order_side),
675 );
676
677 if open_orders.is_empty() {
678 log::debug!("No open orders to cancel for {:?}", command.instrument_id);
679 return Ok(());
680 }
681
682 let mut cancel_requests = Vec::new();
684 let symbol = command.instrument_id.symbol.inner();
685 for order in open_orders {
686 let client_order_id = order.client_order_id().inner();
687
688 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
689 Ok(req) => cancel_requests.push(req),
690 Err(e) => {
691 log::warn!("Failed to convert order {client_order_id} to cancel request: {e}");
692 continue;
693 }
694 }
695 }
696
697 if cancel_requests.is_empty() {
698 log::debug!("No valid cancel requests to send");
699 return Ok(());
700 }
701
702 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
704
705 let http_client = self.http_client.clone();
708 let runtime = get_runtime();
709 runtime.spawn(async move {
710 if let Err(e) = http_client.post_action(&action).await {
711 log::warn!("Failed to send cancel all orders request: {e}");
712 }
713 });
714
715 Ok(())
716 }
717
718 fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
719 log::debug!("Batch cancelling orders: {command:?}");
720
721 if command.cancels.is_empty() {
722 log::debug!("No orders to cancel in batch");
723 return Ok(());
724 }
725
726 let mut cancel_requests = Vec::new();
728 for cancel_cmd in &command.cancels {
729 let client_order_id = cancel_cmd.client_order_id.inner();
730 let symbol = cancel_cmd.instrument_id.symbol.inner();
731
732 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
733 Ok(req) => cancel_requests.push(req),
734 Err(e) => {
735 log::warn!("Failed to convert order {client_order_id} to cancel request: {e}");
736 continue;
737 }
738 }
739 }
740
741 if cancel_requests.is_empty() {
742 log::warn!("No valid cancel requests in batch");
743 return Ok(());
744 }
745
746 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
747
748 let http_client = self.http_client.clone();
751 let runtime = get_runtime();
752 runtime.spawn(async move {
753 if let Err(e) = http_client.post_action(&action).await {
754 log::warn!("Failed to send batch cancel orders request: {e}");
755 }
756 });
757
758 Ok(())
759 }
760
761 fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
762 log::debug!("Querying account: {command:?}");
763
764 let runtime = get_runtime();
766 runtime.block_on(async {
767 if let Err(e) = self.refresh_account_state().await {
768 log::warn!("Failed to query account state: {e}");
769 }
770 });
771
772 Ok(())
773 }
774
775 fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
776 log::debug!("Querying order: {command:?}");
777
778 let cache = self.core.cache().borrow();
780 let venue_order_id = cache.venue_order_id(&command.client_order_id);
781
782 let venue_order_id = match venue_order_id {
783 Some(oid) => *oid,
784 None => {
785 log::warn!(
786 "No venue order ID found for client order {}",
787 command.client_order_id
788 );
789 return Ok(());
790 }
791 };
792 drop(cache);
793
794 let oid = match u64::from_str(venue_order_id.as_ref()) {
796 Ok(id) => id,
797 Err(e) => {
798 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
799 return Ok(());
800 }
801 };
802
803 let user_address = self.get_user_address()?;
805
806 let http_client = self.http_client.clone();
810 let runtime = get_runtime();
811 runtime.spawn(async move {
812 match http_client.info_order_status(&user_address, oid).await {
813 Ok(status) => {
814 log::debug!("Order status for oid {oid}: {status:?}");
815 }
816 Err(e) => {
817 log::warn!("Failed to query order status for oid {oid}: {e}");
818 }
819 }
820 });
821
822 Ok(())
823 }
824
825 async fn connect(&mut self) -> anyhow::Result<()> {
826 if self.connected {
827 return Ok(());
828 }
829
830 log::info!("Connecting Hyperliquid execution client");
831
832 self.ensure_instruments_initialized_async().await?;
834
835 self.ws_client.connect().await?;
837
838 let user_address = self.get_user_address()?;
840 self.ws_client
841 .subscribe_all_user_channels(&user_address)
842 .await?;
843
844 self.refresh_account_state().await?;
846
847 self.connected = true;
848 self.core.set_connected(true);
849
850 if let Err(e) = self.start_ws_stream().await {
852 log::warn!("Failed to start WebSocket stream: {e}");
853 }
854
855 log::info!("Connected: client_id={}", self.core.client_id);
856 Ok(())
857 }
858
859 async fn disconnect(&mut self) -> anyhow::Result<()> {
860 if !self.connected {
861 return Ok(());
862 }
863
864 log::info!("Disconnecting Hyperliquid execution client");
865
866 self.ws_client.disconnect().await?;
868
869 self.abort_pending_tasks();
871
872 self.connected = false;
873 self.core.set_connected(false);
874
875 log::info!("Disconnected: client_id={}", self.core.client_id);
876 Ok(())
877 }
878
879 async fn generate_order_status_report(
880 &self,
881 _cmd: &GenerateOrderStatusReport,
882 ) -> anyhow::Result<Option<OrderStatusReport>> {
883 log::warn!("generate_order_status_report not yet fully implemented");
887 Ok(None)
888 }
889
890 async fn generate_order_status_reports(
891 &self,
892 cmd: &GenerateOrderStatusReports,
893 ) -> anyhow::Result<Vec<OrderStatusReport>> {
894 let user_address = self.get_user_address()?;
895
896 let reports = self
897 .http_client
898 .request_order_status_reports(&user_address, cmd.instrument_id)
899 .await
900 .context("failed to generate order status reports")?;
901
902 let reports = if cmd.open_only {
904 reports
905 .into_iter()
906 .filter(|r| r.order_status.is_open())
907 .collect()
908 } else {
909 reports
910 };
911
912 let reports = match (cmd.start, cmd.end) {
914 (Some(start), Some(end)) => reports
915 .into_iter()
916 .filter(|r| r.ts_last >= start && r.ts_last <= end)
917 .collect(),
918 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
919 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
920 (None, None) => reports,
921 };
922
923 log::info!("Generated {} order status reports", reports.len());
924 Ok(reports)
925 }
926
927 async fn generate_fill_reports(
928 &self,
929 cmd: GenerateFillReports,
930 ) -> anyhow::Result<Vec<FillReport>> {
931 let user_address = self.get_user_address()?;
932
933 let reports = self
934 .http_client
935 .request_fill_reports(&user_address, cmd.instrument_id)
936 .await
937 .context("failed to generate fill reports")?;
938
939 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
941 reports
942 .into_iter()
943 .filter(|r| r.ts_event >= start && r.ts_event <= end)
944 .collect()
945 } else if let Some(start) = cmd.start {
946 reports
947 .into_iter()
948 .filter(|r| r.ts_event >= start)
949 .collect()
950 } else if let Some(end) = cmd.end {
951 reports.into_iter().filter(|r| r.ts_event <= end).collect()
952 } else {
953 reports
954 };
955
956 log::info!("Generated {} fill reports", reports.len());
957 Ok(reports)
958 }
959
960 async fn generate_position_status_reports(
961 &self,
962 cmd: &GeneratePositionStatusReports,
963 ) -> anyhow::Result<Vec<PositionStatusReport>> {
964 let user_address = self.get_user_address()?;
965
966 let reports = self
967 .http_client
968 .request_position_status_reports(&user_address, cmd.instrument_id)
969 .await
970 .context("failed to generate position status reports")?;
971
972 log::info!("Generated {} position status reports", reports.len());
973 Ok(reports)
974 }
975
976 async fn generate_mass_status(
977 &self,
978 lookback_mins: Option<u64>,
979 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
980 log::warn!("generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})");
981 Ok(None)
987 }
988}
989
990impl HyperliquidExecutionClient {
991 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
992 {
993 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
994 if handle_guard.is_some() {
995 return Ok(());
996 }
997 }
998
999 let user_address = self.get_user_address()?;
1000 let _account_id = self.core.account_id;
1001 let mut ws_client = self.ws_client.clone();
1002
1003 let instruments = self
1004 .http_client
1005 .request_instruments()
1006 .await
1007 .unwrap_or_default();
1008
1009 for instrument in instruments {
1010 ws_client.cache_instrument(instrument);
1011 }
1012
1013 let runtime = get_runtime();
1014 let handle = runtime.spawn(async move {
1015 if let Err(e) = ws_client.connect().await {
1016 log::warn!("Failed to connect WebSocket: {e}");
1017 return;
1018 }
1019
1020 if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1021 log::warn!("Failed to subscribe to order updates: {e}");
1022 return;
1023 }
1024
1025 if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1026 log::warn!("Failed to subscribe to user events: {e}");
1027 return;
1028 }
1029
1030 log::info!("Subscribed to Hyperliquid execution updates");
1031
1032 let _clock = get_atomic_clock_realtime();
1033
1034 loop {
1035 let event = ws_client.next_event().await;
1036
1037 match event {
1038 Some(msg) => {
1039 match msg {
1040 NautilusWsMessage::ExecutionReports(reports) => {
1041 for report in reports {
1043 dispatch_execution_report(report);
1044 }
1045 }
1046 NautilusWsMessage::Reconnected => {
1047 log::info!("WebSocket reconnected");
1048 }
1050 NautilusWsMessage::Error(e) => {
1051 log::error!("WebSocket error: {e}");
1052 }
1053 NautilusWsMessage::Trades(_)
1055 | NautilusWsMessage::Quote(_)
1056 | NautilusWsMessage::Deltas(_)
1057 | NautilusWsMessage::Candle(_)
1058 | NautilusWsMessage::MarkPrice(_)
1059 | NautilusWsMessage::IndexPrice(_)
1060 | NautilusWsMessage::FundingRate(_) => {}
1061 }
1062 }
1063 None => {
1064 log::warn!("WebSocket next_event returned None");
1065 break;
1066 }
1067 }
1068 }
1069 });
1070
1071 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1072 log::info!("Hyperliquid WebSocket execution stream started");
1073 Ok(())
1074 }
1075}
1076
1077fn dispatch_execution_report(report: ExecutionReport) {
1078 let sender = get_exec_event_sender();
1079 match report {
1080 ExecutionReport::Order(order_report) => {
1081 let exec_report = NautilusExecutionReport::Order(Box::new(order_report));
1082 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1083 log::warn!("Failed to send order status report: {e}");
1084 }
1085 }
1086 ExecutionReport::Fill(fill_report) => {
1087 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1088 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1089 log::warn!("Failed to send fill report: {e}");
1090 }
1091 }
1092 }
1093}