1use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use nautilus_common::{
22 messages::{
23 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
24 execution::{
25 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
26 SubmitOrder, SubmitOrderList,
27 },
28 },
29 runner::get_exec_event_sender,
30 runtime::get_runtime,
31};
32use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
34use nautilus_model::{
35 accounts::AccountAny,
36 enums::{OmsType, OrderType},
37 identifiers::{AccountId, ClientId, Venue},
38 orders::{Order, any::OrderAny},
39 types::{AccountBalance, MarginBalance},
40};
41use serde_json;
42use tokio::task::JoinHandle;
43
44use crate::{
45 common::{
46 HyperliquidProductType,
47 consts::HYPERLIQUID_VENUE,
48 credential::Secrets,
49 parse::{
50 client_order_id_to_cancel_request, extract_error_message, is_response_successful,
51 order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
52 },
53 },
54 config::HyperliquidExecClientConfig,
55 http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
56 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
57};
58
59#[derive(Debug)]
60pub struct HyperliquidExecutionClient {
61 core: ExecutionClientCore,
62 config: HyperliquidExecClientConfig,
63 http_client: HyperliquidHttpClient,
64 ws_client: HyperliquidWebSocketClient,
65 started: bool,
66 connected: bool,
67 instruments_initialized: bool,
68 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
69 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
70}
71
72impl HyperliquidExecutionClient {
73 pub fn config(&self) -> &HyperliquidExecClientConfig {
75 &self.config
76 }
77
78 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
93 let symbol = order.instrument_id().symbol.to_string();
96 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
97 anyhow::bail!(
98 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
99 );
100 }
101
102 match order.order_type() {
104 OrderType::Market
105 | OrderType::Limit
106 | OrderType::StopMarket
107 | OrderType::StopLimit
108 | OrderType::MarketIfTouched
109 | OrderType::LimitIfTouched => {}
110 _ => anyhow::bail!(
111 "Unsupported order type for Hyperliquid: {:?}",
112 order.order_type()
113 ),
114 }
115
116 if matches!(
118 order.order_type(),
119 OrderType::StopMarket
120 | OrderType::StopLimit
121 | OrderType::MarketIfTouched
122 | OrderType::LimitIfTouched
123 ) && order.trigger_price().is_none()
124 {
125 anyhow::bail!(
126 "Conditional orders require a trigger price for Hyperliquid: {:?}",
127 order.order_type()
128 );
129 }
130
131 if matches!(
133 order.order_type(),
134 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
135 ) && order.price().is_none()
136 {
137 anyhow::bail!(
138 "Limit orders require a limit price for Hyperliquid: {:?}",
139 order.order_type()
140 );
141 }
142
143 Ok(())
144 }
145
146 pub fn new(
152 core: ExecutionClientCore,
153 config: HyperliquidExecClientConfig,
154 ) -> anyhow::Result<Self> {
155 if !config.has_credentials() {
156 anyhow::bail!("Hyperliquid execution client requires private key");
157 }
158
159 let secrets = Secrets::from_json(&format!(
160 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
161 config.private_key, config.is_testnet
162 ))
163 .context("failed to create secrets from private key")?;
164
165 let http_client = HyperliquidHttpClient::with_credentials(
166 &secrets,
167 Some(config.http_timeout_secs),
168 config.http_proxy_url.clone(),
169 )
170 .context("failed to create Hyperliquid HTTP client")?;
171
172 let ws_client = HyperliquidWebSocketClient::new(
176 None,
177 config.is_testnet,
178 HyperliquidProductType::Perp,
179 Some(core.account_id),
180 );
181
182 Ok(Self {
183 core,
184 config,
185 http_client,
186 ws_client,
187 started: false,
188 connected: false,
189 instruments_initialized: false,
190 pending_tasks: Mutex::new(Vec::new()),
191 ws_stream_handle: Mutex::new(None),
192 })
193 }
194
195 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
196 if self.instruments_initialized {
197 return Ok(());
198 }
199
200 let instruments = self
201 .http_client
202 .request_instruments()
203 .await
204 .context("failed to request Hyperliquid instruments")?;
205
206 if instruments.is_empty() {
207 tracing::warn!(
208 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
209 );
210 } else {
211 tracing::info!("Initialized {} instruments", instruments.len());
212
213 for instrument in &instruments {
214 self.http_client.cache_instrument(instrument.clone());
215 }
216 }
217
218 self.instruments_initialized = true;
219 Ok(())
220 }
221
222 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
223 if self.instruments_initialized {
224 return Ok(());
225 }
226
227 let runtime = get_runtime();
228 runtime.block_on(self.ensure_instruments_initialized_async())
229 }
230
231 async fn refresh_account_state(&self) -> anyhow::Result<()> {
232 let user_address = self.get_user_address()?;
235
236 let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
238
239 let clearinghouse_state = self
241 .http_client
242 .info_clearinghouse_state(account_address)
243 .await
244 .context("failed to fetch clearinghouse state")?;
245
246 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
248 .context("failed to deserialize clearinghouse state")?;
249
250 tracing::debug!(
251 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
252 state.cross_margin_summary,
253 state.asset_positions.len()
254 );
255
256 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
258 let (balances, margins) =
259 crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
260 .context("failed to parse account balances and margins")?;
261
262 let ts_event = if let Some(time_ms) = state.time {
263 nautilus_core::UnixNanos::from(time_ms * 1_000_000)
264 } else {
265 nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
266 };
267
268 self.core.generate_account_state(
270 balances, margins, true, ts_event,
272 )?;
273
274 tracing::info!("Account state updated successfully");
275 } else {
276 tracing::warn!("No cross margin summary in clearinghouse state");
277 }
278
279 Ok(())
280 }
281
282 fn get_user_address(&self) -> anyhow::Result<String> {
283 let address = self
284 .http_client
285 .get_user_address()
286 .context("failed to get user address from HTTP client")?;
287
288 Ok(address)
289 }
290
291 fn spawn_task<F>(&self, description: &'static str, fut: F)
292 where
293 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
294 {
295 let runtime = get_runtime();
296 let handle = runtime.spawn(async move {
297 if let Err(e) = fut.await {
298 tracing::warn!("{description} failed: {e:?}");
299 }
300 });
301
302 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
303 tasks.retain(|handle| !handle.is_finished());
304 tasks.push(handle);
305 }
306
307 fn abort_pending_tasks(&self) {
308 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
309 for handle in tasks.drain(..) {
310 handle.abort();
311 }
312 }
313
314 fn update_account_state(&self) -> anyhow::Result<()> {
315 let runtime = get_runtime();
316 runtime.block_on(self.refresh_account_state())
317 }
318}
319
320impl ExecutionClient for HyperliquidExecutionClient {
321 fn is_connected(&self) -> bool {
322 self.connected
323 }
324
325 fn client_id(&self) -> ClientId {
326 self.core.client_id
327 }
328
329 fn account_id(&self) -> AccountId {
330 self.core.account_id
331 }
332
333 fn venue(&self) -> Venue {
334 *HYPERLIQUID_VENUE
335 }
336
337 fn oms_type(&self) -> OmsType {
338 self.core.oms_type
339 }
340
341 fn get_account(&self) -> Option<AccountAny> {
342 self.core.get_account()
343 }
344
345 fn generate_account_state(
346 &self,
347 balances: Vec<AccountBalance>,
348 margins: Vec<MarginBalance>,
349 reported: bool,
350 ts_event: UnixNanos,
351 ) -> anyhow::Result<()> {
352 self.core
353 .generate_account_state(balances, margins, reported, ts_event)
354 }
355
356 fn start(&mut self) -> anyhow::Result<()> {
357 if self.started {
358 return Ok(());
359 }
360
361 tracing::info!(
362 client_id = %self.core.client_id,
363 account_id = %self.core.account_id,
364 is_testnet = self.config.is_testnet,
365 vault_address = ?self.config.vault_address,
366 http_proxy_url = ?self.config.http_proxy_url,
367 ws_proxy_url = ?self.config.ws_proxy_url,
368 "Starting Hyperliquid execution client"
369 );
370
371 self.ensure_instruments_initialized()?;
373
374 if let Err(e) = self.update_account_state() {
376 tracing::warn!("Failed to initialize account state: {e}");
377 }
378
379 self.connected = true;
380 self.started = true;
381
382 if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
384 tracing::warn!("Failed to start WebSocket stream: {e}");
385 }
386
387 tracing::info!("Hyperliquid execution client started");
388 Ok(())
389 }
390 fn stop(&mut self) -> anyhow::Result<()> {
391 if !self.started {
392 return Ok(());
393 }
394
395 tracing::info!("Stopping Hyperliquid execution client");
396
397 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
399 handle.abort();
400 }
401
402 self.abort_pending_tasks();
404
405 if self.connected {
407 let runtime = get_runtime();
408 runtime.block_on(async {
409 if let Err(e) = self.ws_client.disconnect().await {
410 tracing::warn!("Error disconnecting WebSocket client: {e}");
411 }
412 });
413 }
414
415 self.connected = false;
416 self.started = false;
417
418 tracing::info!("Hyperliquid execution client stopped");
419 Ok(())
420 }
421
422 fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
423 let order = &command.order;
424
425 if order.is_closed() {
426 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
427 return Ok(());
428 }
429
430 if let Err(e) = self.validate_order_submission(order) {
431 self.core.generate_order_rejected(
432 order.strategy_id(),
433 order.instrument_id(),
434 order.client_order_id(),
435 &format!("validation-error: {e}"),
436 command.ts_init,
437 false,
438 );
439 return Err(e);
440 }
441
442 self.core.generate_order_submitted(
443 order.strategy_id(),
444 order.instrument_id(),
445 order.client_order_id(),
446 command.ts_init,
447 );
448
449 let http_client = self.http_client.clone();
450 let order_clone = order.clone();
451
452 self.spawn_task("submit_order", async move {
453 match order_any_to_hyperliquid_request(&order_clone) {
454 Ok(hyperliquid_order) => {
455 let action = ExchangeAction::order(vec![hyperliquid_order]);
457
458 match http_client.post_action(&action).await {
459 Ok(response) => {
460 if is_response_successful(&response) {
461 tracing::info!("Order submitted successfully: {:?}", response);
462 } else {
465 let error_msg = extract_error_message(&response);
466 tracing::warn!(
467 "Order submission rejected by exchange: {}",
468 error_msg
469 );
470 }
472 }
473 Err(e) => {
474 tracing::warn!("Order submission HTTP request failed: {e}");
475 }
477 }
478 }
479 Err(e) => {
480 tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
481 }
483 }
484
485 Ok(())
486 });
487
488 Ok(())
489 }
490
491 fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
492 tracing::debug!(
493 "Submitting order list with {} orders",
494 command.order_list.orders.len()
495 );
496
497 let http_client = self.http_client.clone();
498 let orders: Vec<OrderAny> = command.order_list.orders.clone();
499
500 for order in &orders {
502 self.core.generate_order_submitted(
503 order.strategy_id(),
504 order.instrument_id(),
505 order.client_order_id(),
506 command.ts_init,
507 );
508 }
509
510 self.spawn_task("submit_order_list", async move {
511 let order_refs: Vec<&OrderAny> = orders.iter().collect();
513 match orders_to_hyperliquid_requests(&order_refs) {
514 Ok(hyperliquid_orders) => {
515 let action = ExchangeAction::order(hyperliquid_orders);
517 match http_client.post_action(&action).await {
518 Ok(response) => {
519 if is_response_successful(&response) {
520 tracing::info!("Order list submitted successfully: {:?}", response);
521 } else {
523 let error_msg = extract_error_message(&response);
524 tracing::warn!(
525 "Order list submission rejected by exchange: {}",
526 error_msg
527 );
528 }
530 }
531 Err(e) => {
532 tracing::warn!("Order list submission HTTP request failed: {e}");
533 }
535 }
536 }
537 Err(e) => {
538 tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
539 }
540 }
541
542 Ok(())
543 });
544
545 Ok(())
546 }
547
548 fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
549 tracing::debug!("Modifying order: {:?}", command);
550
551 let oid: u64 = match command.venue_order_id.as_str().parse() {
553 Ok(id) => id,
554 Err(e) => {
555 tracing::warn!(
556 "Failed to parse venue_order_id '{}' as u64: {}",
557 command.venue_order_id,
558 e
559 );
560 return Ok(());
561 }
562 };
563
564 let http_client = self.http_client.clone();
565 let price = command.price;
566 let quantity = command.quantity;
567 let symbol = command.instrument_id.symbol.to_string();
568
569 self.spawn_task("modify_order", async move {
570 use crate::{
571 common::parse::extract_asset_id_from_symbol,
572 http::models::HyperliquidExecModifyOrderRequest,
573 };
574
575 let asset = match extract_asset_id_from_symbol(&symbol) {
577 Ok(asset) => asset,
578 Err(e) => {
579 tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
580 return Ok(());
581 }
582 };
583
584 let modify_request = HyperliquidExecModifyOrderRequest {
586 asset,
587 oid,
588 price: price.map(|p| (*p).into()),
589 size: quantity.map(|q| (*q).into()),
590 reduce_only: None,
591 kind: None,
592 };
593
594 let action = ExchangeAction::modify(oid, modify_request);
595
596 match http_client.post_action(&action).await {
597 Ok(response) => {
598 if is_response_successful(&response) {
599 tracing::info!("Order modified successfully: {:?}", response);
600 } else {
602 let error_msg = extract_error_message(&response);
603 tracing::warn!("Order modification rejected by exchange: {}", error_msg);
604 }
606 }
607 Err(e) => {
608 tracing::warn!("Order modification HTTP request failed: {e}");
609 }
611 }
612
613 Ok(())
614 });
615
616 Ok(())
617 }
618
619 fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
620 tracing::debug!("Cancelling order: {:?}", command);
621
622 let http_client = self.http_client.clone();
623 let client_order_id = command.client_order_id.to_string();
624 let symbol = command.instrument_id.symbol.to_string();
625
626 self.spawn_task("cancel_order", async move {
627 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
628 Ok(cancel_request) => {
629 let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
631 match http_client.post_action(&action).await {
632 Ok(response) => {
633 if is_response_successful(&response) {
634 tracing::info!("Order cancelled successfully: {:?}", response);
635 } else {
638 let error_msg = extract_error_message(&response);
639 tracing::warn!(
640 "Order cancellation rejected by exchange: {}",
641 error_msg
642 );
643 }
645 }
646 Err(e) => {
647 tracing::warn!("Order cancellation HTTP request failed: {e}");
648 }
650 }
651 }
652 Err(e) => {
653 tracing::warn!(
654 "Failed to convert order to Hyperliquid cancel format: {:?}",
655 e
656 );
657 }
658 }
659
660 Ok(())
661 });
662
663 Ok(())
664 }
665
666 fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
667 tracing::debug!("Cancelling all orders: {:?}", command);
668
669 let cache = self.core.cache().borrow();
671 let open_orders = cache.orders_open(
672 Some(&self.core.venue),
673 Some(&command.instrument_id),
674 None,
675 Some(command.order_side),
676 );
677
678 if open_orders.is_empty() {
679 tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
680 return Ok(());
681 }
682
683 let mut cancel_requests = Vec::new();
685 for order in open_orders {
686 let client_order_id = order.client_order_id().to_string();
687 let symbol = command.instrument_id.symbol.to_string();
688
689 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
690 Ok(req) => cancel_requests.push(req),
691 Err(e) => {
692 tracing::warn!(
693 "Failed to convert order {} to cancel request: {}",
694 client_order_id,
695 e
696 );
697 continue;
698 }
699 }
700 }
701
702 if cancel_requests.is_empty() {
703 tracing::debug!("No valid cancel requests to send");
704 return Ok(());
705 }
706
707 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
709
710 let http_client = self.http_client.clone();
713 let runtime = get_runtime();
714 runtime.spawn(async move {
715 if let Err(e) = http_client.post_action(&action).await {
716 tracing::warn!("Failed to send cancel all orders request: {e}");
717 }
718 });
719
720 Ok(())
721 }
722
723 fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
724 tracing::debug!("Batch cancelling orders: {:?}", command);
725
726 if command.cancels.is_empty() {
727 tracing::debug!("No orders to cancel in batch");
728 return Ok(());
729 }
730
731 let mut cancel_requests = Vec::new();
733 for cancel_cmd in &command.cancels {
734 let client_order_id = cancel_cmd.client_order_id.to_string();
735 let symbol = cancel_cmd.instrument_id.symbol.to_string();
736
737 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
738 Ok(req) => cancel_requests.push(req),
739 Err(e) => {
740 tracing::warn!(
741 "Failed to convert order {} to cancel request: {}",
742 client_order_id,
743 e
744 );
745 continue;
746 }
747 }
748 }
749
750 if cancel_requests.is_empty() {
751 tracing::warn!("No valid cancel requests in batch");
752 return Ok(());
753 }
754
755 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
756
757 let http_client = self.http_client.clone();
760 let runtime = get_runtime();
761 runtime.spawn(async move {
762 if let Err(e) = http_client.post_action(&action).await {
763 tracing::warn!("Failed to send batch cancel orders request: {e}");
764 }
765 });
766
767 Ok(())
768 }
769
770 fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
771 tracing::debug!("Querying account: {:?}", command);
772
773 let runtime = get_runtime();
775 runtime.block_on(async {
776 if let Err(e) = self.refresh_account_state().await {
777 tracing::warn!("Failed to query account state: {e}");
778 }
779 });
780
781 Ok(())
782 }
783
784 fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
785 tracing::debug!("Querying order: {:?}", command);
786
787 let cache = self.core.cache().borrow();
789 let venue_order_id = cache.venue_order_id(&command.client_order_id);
790
791 let venue_order_id = match venue_order_id {
792 Some(oid) => *oid,
793 None => {
794 tracing::warn!(
795 "No venue order ID found for client order {}",
796 command.client_order_id
797 );
798 return Ok(());
799 }
800 };
801 drop(cache);
802
803 let oid = match u64::from_str(venue_order_id.as_ref()) {
805 Ok(id) => id,
806 Err(e) => {
807 tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
808 return Ok(());
809 }
810 };
811
812 let user_address = self.get_user_address()?;
814
815 let http_client = self.http_client.clone();
819 let runtime = get_runtime();
820 runtime.spawn(async move {
821 match http_client.info_order_status(&user_address, oid).await {
822 Ok(status) => {
823 tracing::debug!("Order status for oid {}: {:?}", oid, status);
824 }
825 Err(e) => {
826 tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
827 }
828 }
829 });
830
831 Ok(())
832 }
833}
834
835use async_trait::async_trait;
840use nautilus_common::messages::execution::{
841 GenerateFillReports, GenerateOrderStatusReport, GeneratePositionReports,
842};
843use nautilus_live::execution::client::LiveExecutionClient;
844use nautilus_model::reports::{
845 ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport,
846};
847
848#[async_trait(?Send)]
849impl LiveExecutionClient for HyperliquidExecutionClient {
850 fn get_message_channel(
851 &self,
852 ) -> tokio::sync::mpsc::UnboundedSender<nautilus_common::messages::ExecutionEvent> {
853 get_exec_event_sender()
854 }
855
856 fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
857 self.core.clock().borrow()
858 }
859
860 async fn connect(&mut self) -> anyhow::Result<()> {
861 if self.connected {
862 return Ok(());
863 }
864
865 tracing::info!("Connecting Hyperliquid execution client");
866
867 self.ensure_instruments_initialized_async().await?;
869
870 self.ws_client.connect().await?;
872
873 let user_address = self.get_user_address()?;
875 self.ws_client
876 .subscribe_all_user_channels(&user_address)
877 .await?;
878
879 self.refresh_account_state().await?;
881
882 self.connected = true;
886 self.core.set_connected(true);
887
888 if let Err(e) = self.start_ws_stream().await {
890 tracing::warn!("Failed to start WebSocket stream: {e}");
891 }
892
893 tracing::info!(client_id = %self.core.client_id, "Connected");
894 Ok(())
895 }
896
897 async fn disconnect(&mut self) -> anyhow::Result<()> {
898 if !self.connected {
899 return Ok(());
900 }
901
902 tracing::info!("Disconnecting Hyperliquid execution client");
903
904 self.ws_client.disconnect().await?;
906
907 self.abort_pending_tasks();
909
910 self.connected = false;
911 self.core.set_connected(false);
912
913 tracing::info!(client_id = %self.core.client_id, "Disconnected");
914 Ok(())
915 }
916
917 async fn generate_order_status_report(
918 &self,
919 _cmd: &GenerateOrderStatusReport,
920 ) -> anyhow::Result<Option<OrderStatusReport>> {
921 tracing::warn!("generate_order_status_report not yet fully implemented");
925 Ok(None)
926 }
927
928 async fn generate_order_status_reports(
929 &self,
930 cmd: &GenerateOrderStatusReport,
931 ) -> anyhow::Result<Vec<OrderStatusReport>> {
932 let user_address = self.get_user_address()?;
933
934 let reports = self
935 .http_client
936 .request_order_status_reports(&user_address, cmd.instrument_id)
937 .await
938 .context("failed to generate order status reports")?;
939
940 let reports = if let Some(client_order_id) = cmd.client_order_id {
942 reports
943 .into_iter()
944 .filter(|r| r.client_order_id == Some(client_order_id))
945 .collect()
946 } else {
947 reports
948 };
949
950 tracing::info!("Generated {} order status reports", reports.len());
954 Ok(reports)
955 }
956
957 async fn generate_fill_reports(
958 &self,
959 cmd: GenerateFillReports,
960 ) -> anyhow::Result<Vec<FillReport>> {
961 let user_address = self.get_user_address()?;
962
963 let reports = self
964 .http_client
965 .request_fill_reports(&user_address, cmd.instrument_id)
966 .await
967 .context("failed to generate fill reports")?;
968
969 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
971 reports
972 .into_iter()
973 .filter(|r| r.ts_event >= start && r.ts_event <= end)
974 .collect()
975 } else if let Some(start) = cmd.start {
976 reports
977 .into_iter()
978 .filter(|r| r.ts_event >= start)
979 .collect()
980 } else if let Some(end) = cmd.end {
981 reports.into_iter().filter(|r| r.ts_event <= end).collect()
982 } else {
983 reports
984 };
985
986 tracing::info!("Generated {} fill reports", reports.len());
987 Ok(reports)
988 }
989
990 async fn generate_position_status_reports(
991 &self,
992 cmd: &GeneratePositionReports,
993 ) -> anyhow::Result<Vec<PositionStatusReport>> {
994 let user_address = self.get_user_address()?;
995
996 let reports = self
997 .http_client
998 .request_position_status_reports(&user_address, cmd.instrument_id)
999 .await
1000 .context("failed to generate position status reports")?;
1001
1002 tracing::info!("Generated {} position status reports", reports.len());
1003 Ok(reports)
1004 }
1005
1006 async fn generate_mass_status(
1007 &self,
1008 lookback_mins: Option<u64>,
1009 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1010 tracing::warn!(
1011 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1012 );
1013 Ok(None)
1019 }
1020}
1021
1022impl HyperliquidExecutionClient {
1023 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1024 {
1025 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1026 if handle_guard.is_some() {
1027 return Ok(());
1028 }
1029 }
1030
1031 let user_address = self.get_user_address()?;
1032 let _account_id = self.core.account_id;
1033 let mut ws_client = self.ws_client.clone();
1034
1035 let instruments = self
1036 .http_client
1037 .request_instruments()
1038 .await
1039 .unwrap_or_default();
1040
1041 for instrument in instruments {
1042 ws_client.cache_instrument(instrument);
1043 }
1044
1045 let runtime = get_runtime();
1046 let handle = runtime.spawn(async move {
1047 if let Err(e) = ws_client.connect().await {
1048 tracing::warn!("Failed to connect WebSocket: {e}");
1049 return;
1050 }
1051
1052 if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1053 tracing::warn!("Failed to subscribe to order updates: {e}");
1054 return;
1055 }
1056
1057 if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1058 tracing::warn!("Failed to subscribe to user events: {e}");
1059 return;
1060 }
1061
1062 tracing::info!("Subscribed to Hyperliquid execution updates");
1063
1064 let _clock = get_atomic_clock_realtime();
1065
1066 loop {
1067 let event = ws_client.next_event().await;
1068
1069 match event {
1070 Some(msg) => {
1071 match msg {
1072 NautilusWsMessage::ExecutionReports(reports) => {
1073 for report in reports {
1075 dispatch_execution_report(report);
1076 }
1077 }
1078 NautilusWsMessage::Reconnected => {
1079 tracing::info!("WebSocket reconnected");
1080 }
1082 NautilusWsMessage::Error(e) => {
1083 tracing::error!("WebSocket error: {e}");
1084 }
1085 NautilusWsMessage::Trades(_)
1087 | NautilusWsMessage::Quote(_)
1088 | NautilusWsMessage::Deltas(_)
1089 | NautilusWsMessage::Candle(_)
1090 | NautilusWsMessage::MarkPrice(_)
1091 | NautilusWsMessage::IndexPrice(_)
1092 | NautilusWsMessage::FundingRate(_) => {}
1093 }
1094 }
1095 None => {
1096 tracing::warn!("WebSocket next_event returned None");
1097 break;
1098 }
1099 }
1100 }
1101 });
1102
1103 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1104 tracing::info!("Hyperliquid WebSocket execution stream started");
1105 Ok(())
1106 }
1107}
1108
1109fn dispatch_execution_report(report: ExecutionReport) {
1110 let sender = get_exec_event_sender();
1111 match report {
1112 ExecutionReport::Order(order_report) => {
1113 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1114 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1115 tracing::warn!("Failed to send order status report: {e}");
1116 }
1117 }
1118 ExecutionReport::Fill(fill_report) => {
1119 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1120 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1121 tracing::warn!("Failed to send fill report: {e}");
1122 }
1123 }
1124 }
1125}