1use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23 live::{runner::get_exec_event_sender, runtime::get_runtime},
24 messages::{
25 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
26 execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
29 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
30 },
31 },
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_model::{
36 accounts::AccountAny,
37 enums::{OmsType, OrderType},
38 identifiers::{AccountId, ClientId, Venue},
39 orders::{Order, any::OrderAny},
40 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
41 types::{AccountBalance, MarginBalance},
42};
43use serde_json;
44use tokio::task::JoinHandle;
45
46use crate::{
47 common::{
48 HyperliquidProductType,
49 consts::HYPERLIQUID_VENUE,
50 credential::Secrets,
51 parse::{
52 client_order_id_to_cancel_request, extract_error_message, is_response_successful,
53 order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
54 },
55 },
56 config::HyperliquidExecClientConfig,
57 http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
58 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
59};
60
61#[derive(Debug)]
62pub struct HyperliquidExecutionClient {
63 core: ExecutionClientCore,
64 config: HyperliquidExecClientConfig,
65 http_client: HyperliquidHttpClient,
66 ws_client: HyperliquidWebSocketClient,
67 started: bool,
68 connected: bool,
69 instruments_initialized: bool,
70 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
71 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
72}
73
74impl HyperliquidExecutionClient {
75 pub fn config(&self) -> &HyperliquidExecClientConfig {
77 &self.config
78 }
79
80 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
95 let instrument_id = order.instrument_id();
98 let symbol = instrument_id.symbol.as_str();
99 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
100 anyhow::bail!(
101 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
102 );
103 }
104
105 match order.order_type() {
107 OrderType::Market
108 | OrderType::Limit
109 | OrderType::StopMarket
110 | OrderType::StopLimit
111 | OrderType::MarketIfTouched
112 | OrderType::LimitIfTouched => {}
113 _ => anyhow::bail!(
114 "Unsupported order type for Hyperliquid: {:?}",
115 order.order_type()
116 ),
117 }
118
119 if matches!(
121 order.order_type(),
122 OrderType::StopMarket
123 | OrderType::StopLimit
124 | OrderType::MarketIfTouched
125 | OrderType::LimitIfTouched
126 ) && order.trigger_price().is_none()
127 {
128 anyhow::bail!(
129 "Conditional orders require a trigger price for Hyperliquid: {:?}",
130 order.order_type()
131 );
132 }
133
134 if matches!(
136 order.order_type(),
137 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138 ) && order.price().is_none()
139 {
140 anyhow::bail!(
141 "Limit orders require a limit price for Hyperliquid: {:?}",
142 order.order_type()
143 );
144 }
145
146 Ok(())
147 }
148
149 pub fn new(
155 core: ExecutionClientCore,
156 config: HyperliquidExecClientConfig,
157 ) -> anyhow::Result<Self> {
158 if !config.has_credentials() {
159 anyhow::bail!("Hyperliquid execution client requires private key");
160 }
161
162 let secrets = Secrets::from_json(&format!(
163 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164 config.private_key, config.is_testnet
165 ))
166 .context("failed to create secrets from private key")?;
167
168 let http_client = HyperliquidHttpClient::with_credentials(
169 &secrets,
170 Some(config.http_timeout_secs),
171 config.http_proxy_url.clone(),
172 )
173 .context("failed to create Hyperliquid HTTP client")?;
174
175 let ws_client = HyperliquidWebSocketClient::new(
179 None,
180 config.is_testnet,
181 HyperliquidProductType::Perp,
182 Some(core.account_id),
183 );
184
185 Ok(Self {
186 core,
187 config,
188 http_client,
189 ws_client,
190 started: false,
191 connected: false,
192 instruments_initialized: false,
193 pending_tasks: Mutex::new(Vec::new()),
194 ws_stream_handle: Mutex::new(None),
195 })
196 }
197
198 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
199 if self.instruments_initialized {
200 return Ok(());
201 }
202
203 let instruments = self
204 .http_client
205 .request_instruments()
206 .await
207 .context("failed to request Hyperliquid instruments")?;
208
209 if instruments.is_empty() {
210 tracing::warn!(
211 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
212 );
213 } else {
214 tracing::info!("Initialized {} instruments", instruments.len());
215
216 for instrument in &instruments {
217 self.http_client.cache_instrument(instrument.clone());
218 }
219 }
220
221 self.instruments_initialized = true;
222 Ok(())
223 }
224
225 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
226 if self.instruments_initialized {
227 return Ok(());
228 }
229
230 let runtime = get_runtime();
231 runtime.block_on(self.ensure_instruments_initialized_async())
232 }
233
234 async fn refresh_account_state(&self) -> anyhow::Result<()> {
235 let user_address = self.get_user_address()?;
238
239 let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
241
242 let clearinghouse_state = self
244 .http_client
245 .info_clearinghouse_state(account_address)
246 .await
247 .context("failed to fetch clearinghouse state")?;
248
249 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
251 .context("failed to deserialize clearinghouse state")?;
252
253 tracing::debug!(
254 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
255 state.cross_margin_summary,
256 state.asset_positions.len()
257 );
258
259 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
261 let (balances, margins) =
262 crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
263 .context("failed to parse account balances and margins")?;
264
265 let ts_event = if let Some(time_ms) = state.time {
266 nautilus_core::UnixNanos::from(time_ms * 1_000_000)
267 } else {
268 nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
269 };
270
271 self.core.generate_account_state(
273 balances, margins, true, ts_event,
275 )?;
276
277 tracing::info!("Account state updated successfully");
278 } else {
279 tracing::warn!("No cross margin summary in clearinghouse state");
280 }
281
282 Ok(())
283 }
284
285 fn get_user_address(&self) -> anyhow::Result<String> {
286 let address = self
287 .http_client
288 .get_user_address()
289 .context("failed to get user address from HTTP client")?;
290
291 Ok(address)
292 }
293
294 fn spawn_task<F>(&self, description: &'static str, fut: F)
295 where
296 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
297 {
298 let runtime = get_runtime();
299 let handle = runtime.spawn(async move {
300 if let Err(e) = fut.await {
301 tracing::warn!("{description} failed: {e:?}");
302 }
303 });
304
305 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306 tasks.retain(|handle| !handle.is_finished());
307 tasks.push(handle);
308 }
309
310 fn abort_pending_tasks(&self) {
311 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
312 for handle in tasks.drain(..) {
313 handle.abort();
314 }
315 }
316
317 fn update_account_state(&self) -> anyhow::Result<()> {
318 let runtime = get_runtime();
319 runtime.block_on(self.refresh_account_state())
320 }
321}
322
323#[async_trait(?Send)]
324impl ExecutionClient for HyperliquidExecutionClient {
325 fn is_connected(&self) -> bool {
326 self.connected
327 }
328
329 fn client_id(&self) -> ClientId {
330 self.core.client_id
331 }
332
333 fn account_id(&self) -> AccountId {
334 self.core.account_id
335 }
336
337 fn venue(&self) -> Venue {
338 *HYPERLIQUID_VENUE
339 }
340
341 fn oms_type(&self) -> OmsType {
342 self.core.oms_type
343 }
344
345 fn get_account(&self) -> Option<AccountAny> {
346 self.core.get_account()
347 }
348
349 fn generate_account_state(
350 &self,
351 balances: Vec<AccountBalance>,
352 margins: Vec<MarginBalance>,
353 reported: bool,
354 ts_event: UnixNanos,
355 ) -> anyhow::Result<()> {
356 self.core
357 .generate_account_state(balances, margins, reported, ts_event)
358 }
359
360 fn start(&mut self) -> anyhow::Result<()> {
361 if self.started {
362 return Ok(());
363 }
364
365 tracing::info!(
366 client_id = %self.core.client_id,
367 account_id = %self.core.account_id,
368 is_testnet = self.config.is_testnet,
369 vault_address = ?self.config.vault_address,
370 http_proxy_url = ?self.config.http_proxy_url,
371 ws_proxy_url = ?self.config.ws_proxy_url,
372 "Starting Hyperliquid execution client"
373 );
374
375 self.ensure_instruments_initialized()?;
377
378 if let Err(e) = self.update_account_state() {
380 tracing::warn!("Failed to initialize account state: {e}");
381 }
382
383 self.connected = true;
384 self.started = true;
385
386 if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
388 tracing::warn!("Failed to start WebSocket stream: {e}");
389 }
390
391 tracing::info!("Hyperliquid execution client started");
392 Ok(())
393 }
394 fn stop(&mut self) -> anyhow::Result<()> {
395 if !self.started {
396 return Ok(());
397 }
398
399 tracing::info!("Stopping Hyperliquid execution client");
400
401 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
403 handle.abort();
404 }
405
406 self.abort_pending_tasks();
408
409 if self.connected {
411 let runtime = get_runtime();
412 runtime.block_on(async {
413 if let Err(e) = self.ws_client.disconnect().await {
414 tracing::warn!("Error disconnecting WebSocket client: {e}");
415 }
416 });
417 }
418
419 self.connected = false;
420 self.started = false;
421
422 tracing::info!("Hyperliquid execution client stopped");
423 Ok(())
424 }
425
426 fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
427 let order = &command.order;
428
429 if order.is_closed() {
430 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
431 return Ok(());
432 }
433
434 if let Err(e) = self.validate_order_submission(order) {
435 self.core.generate_order_rejected(
436 order.strategy_id(),
437 order.instrument_id(),
438 order.client_order_id(),
439 &format!("validation-error: {e}"),
440 command.ts_init,
441 false,
442 );
443 return Err(e);
444 }
445
446 self.core.generate_order_submitted(
447 order.strategy_id(),
448 order.instrument_id(),
449 order.client_order_id(),
450 command.ts_init,
451 );
452
453 let http_client = self.http_client.clone();
454 let order_clone = order.clone();
455
456 self.spawn_task("submit_order", async move {
457 match order_any_to_hyperliquid_request(&order_clone) {
458 Ok(hyperliquid_order) => {
459 let action = ExchangeAction::order(vec![hyperliquid_order]);
461
462 match http_client.post_action(&action).await {
463 Ok(response) => {
464 if is_response_successful(&response) {
465 tracing::info!("Order submitted successfully: {:?}", response);
466 } else {
469 let error_msg = extract_error_message(&response);
470 tracing::warn!(
471 "Order submission rejected by exchange: {}",
472 error_msg
473 );
474 }
476 }
477 Err(e) => {
478 tracing::warn!("Order submission HTTP request failed: {e}");
479 }
481 }
482 }
483 Err(e) => {
484 tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
485 }
487 }
488
489 Ok(())
490 });
491
492 Ok(())
493 }
494
495 fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
496 tracing::debug!(
497 "Submitting order list with {} orders",
498 command.order_list.orders.len()
499 );
500
501 let http_client = self.http_client.clone();
502 let orders: Vec<OrderAny> = command.order_list.orders.clone();
503
504 for order in &orders {
506 self.core.generate_order_submitted(
507 order.strategy_id(),
508 order.instrument_id(),
509 order.client_order_id(),
510 command.ts_init,
511 );
512 }
513
514 self.spawn_task("submit_order_list", async move {
515 let order_refs: Vec<&OrderAny> = orders.iter().collect();
517 match orders_to_hyperliquid_requests(&order_refs) {
518 Ok(hyperliquid_orders) => {
519 let action = ExchangeAction::order(hyperliquid_orders);
521 match http_client.post_action(&action).await {
522 Ok(response) => {
523 if is_response_successful(&response) {
524 tracing::info!("Order list submitted successfully: {:?}", response);
525 } else {
527 let error_msg = extract_error_message(&response);
528 tracing::warn!(
529 "Order list submission rejected by exchange: {}",
530 error_msg
531 );
532 }
534 }
535 Err(e) => {
536 tracing::warn!("Order list submission HTTP request failed: {e}");
537 }
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
543 }
544 }
545
546 Ok(())
547 });
548
549 Ok(())
550 }
551
552 fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
553 tracing::debug!("Modifying order: {:?}", command);
554
555 let venue_order_id = match command.venue_order_id {
557 Some(id) => id,
558 None => {
559 tracing::warn!("Cannot modify order: venue_order_id is None");
560 return Ok(());
561 }
562 };
563
564 let oid: u64 = match venue_order_id.as_str().parse() {
565 Ok(id) => id,
566 Err(e) => {
567 tracing::warn!(
568 "Failed to parse venue_order_id '{}' as u64: {}",
569 venue_order_id,
570 e
571 );
572 return Ok(());
573 }
574 };
575
576 let http_client = self.http_client.clone();
577 let price = command.price;
578 let quantity = command.quantity;
579 let symbol = command.instrument_id.symbol.inner();
580
581 self.spawn_task("modify_order", async move {
582 use crate::{
583 common::parse::extract_asset_id_from_symbol,
584 http::models::HyperliquidExecModifyOrderRequest,
585 };
586
587 let asset = match extract_asset_id_from_symbol(&symbol) {
589 Ok(asset) => asset,
590 Err(e) => {
591 tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
592 return Ok(());
593 }
594 };
595
596 let modify_request = HyperliquidExecModifyOrderRequest {
598 asset,
599 oid,
600 price: price.map(|p| (*p).into()),
601 size: quantity.map(|q| (*q).into()),
602 reduce_only: None,
603 kind: None,
604 };
605
606 let action = ExchangeAction::modify(oid, modify_request);
607
608 match http_client.post_action(&action).await {
609 Ok(response) => {
610 if is_response_successful(&response) {
611 tracing::info!("Order modified successfully: {:?}", response);
612 } else {
614 let error_msg = extract_error_message(&response);
615 tracing::warn!("Order modification rejected by exchange: {}", error_msg);
616 }
618 }
619 Err(e) => {
620 tracing::warn!("Order modification HTTP request failed: {e}");
621 }
623 }
624
625 Ok(())
626 });
627
628 Ok(())
629 }
630
631 fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
632 tracing::debug!("Cancelling order: {:?}", command);
633
634 let http_client = self.http_client.clone();
635 let client_order_id = command.client_order_id.inner();
636 let symbol = command.instrument_id.symbol.inner();
637
638 self.spawn_task("cancel_order", async move {
639 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
640 Ok(cancel_request) => {
641 let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
643 match http_client.post_action(&action).await {
644 Ok(response) => {
645 if is_response_successful(&response) {
646 tracing::info!("Order cancelled successfully: {:?}", response);
647 } else {
650 let error_msg = extract_error_message(&response);
651 tracing::warn!(
652 "Order cancellation rejected by exchange: {}",
653 error_msg
654 );
655 }
657 }
658 Err(e) => {
659 tracing::warn!("Order cancellation HTTP request failed: {e}");
660 }
662 }
663 }
664 Err(e) => {
665 tracing::warn!(
666 "Failed to convert order to Hyperliquid cancel format: {:?}",
667 e
668 );
669 }
670 }
671
672 Ok(())
673 });
674
675 Ok(())
676 }
677
678 fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
679 tracing::debug!("Cancelling all orders: {:?}", command);
680
681 let cache = self.core.cache().borrow();
683 let open_orders = cache.orders_open(
684 Some(&self.core.venue),
685 Some(&command.instrument_id),
686 None,
687 Some(command.order_side),
688 );
689
690 if open_orders.is_empty() {
691 tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
692 return Ok(());
693 }
694
695 let mut cancel_requests = Vec::new();
697 let symbol = command.instrument_id.symbol.inner();
698 for order in open_orders {
699 let client_order_id = order.client_order_id().inner();
700
701 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
702 Ok(req) => cancel_requests.push(req),
703 Err(e) => {
704 tracing::warn!(
705 "Failed to convert order {} to cancel request: {}",
706 client_order_id,
707 e
708 );
709 continue;
710 }
711 }
712 }
713
714 if cancel_requests.is_empty() {
715 tracing::debug!("No valid cancel requests to send");
716 return Ok(());
717 }
718
719 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
721
722 let http_client = self.http_client.clone();
725 let runtime = get_runtime();
726 runtime.spawn(async move {
727 if let Err(e) = http_client.post_action(&action).await {
728 tracing::warn!("Failed to send cancel all orders request: {e}");
729 }
730 });
731
732 Ok(())
733 }
734
735 fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
736 tracing::debug!("Batch cancelling orders: {:?}", command);
737
738 if command.cancels.is_empty() {
739 tracing::debug!("No orders to cancel in batch");
740 return Ok(());
741 }
742
743 let mut cancel_requests = Vec::new();
745 for cancel_cmd in &command.cancels {
746 let client_order_id = cancel_cmd.client_order_id.inner();
747 let symbol = cancel_cmd.instrument_id.symbol.inner();
748
749 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
750 Ok(req) => cancel_requests.push(req),
751 Err(e) => {
752 tracing::warn!(
753 "Failed to convert order {} to cancel request: {}",
754 client_order_id,
755 e
756 );
757 continue;
758 }
759 }
760 }
761
762 if cancel_requests.is_empty() {
763 tracing::warn!("No valid cancel requests in batch");
764 return Ok(());
765 }
766
767 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
768
769 let http_client = self.http_client.clone();
772 let runtime = get_runtime();
773 runtime.spawn(async move {
774 if let Err(e) = http_client.post_action(&action).await {
775 tracing::warn!("Failed to send batch cancel orders request: {e}");
776 }
777 });
778
779 Ok(())
780 }
781
782 fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
783 tracing::debug!("Querying account: {:?}", command);
784
785 let runtime = get_runtime();
787 runtime.block_on(async {
788 if let Err(e) = self.refresh_account_state().await {
789 tracing::warn!("Failed to query account state: {e}");
790 }
791 });
792
793 Ok(())
794 }
795
796 fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
797 tracing::debug!("Querying order: {:?}", command);
798
799 let cache = self.core.cache().borrow();
801 let venue_order_id = cache.venue_order_id(&command.client_order_id);
802
803 let venue_order_id = match venue_order_id {
804 Some(oid) => *oid,
805 None => {
806 tracing::warn!(
807 "No venue order ID found for client order {}",
808 command.client_order_id
809 );
810 return Ok(());
811 }
812 };
813 drop(cache);
814
815 let oid = match u64::from_str(venue_order_id.as_ref()) {
817 Ok(id) => id,
818 Err(e) => {
819 tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
820 return Ok(());
821 }
822 };
823
824 let user_address = self.get_user_address()?;
826
827 let http_client = self.http_client.clone();
831 let runtime = get_runtime();
832 runtime.spawn(async move {
833 match http_client.info_order_status(&user_address, oid).await {
834 Ok(status) => {
835 tracing::debug!("Order status for oid {}: {:?}", oid, status);
836 }
837 Err(e) => {
838 tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
839 }
840 }
841 });
842
843 Ok(())
844 }
845
846 async fn connect(&mut self) -> anyhow::Result<()> {
847 if self.connected {
848 return Ok(());
849 }
850
851 tracing::info!("Connecting Hyperliquid execution client");
852
853 self.ensure_instruments_initialized_async().await?;
855
856 self.ws_client.connect().await?;
858
859 let user_address = self.get_user_address()?;
861 self.ws_client
862 .subscribe_all_user_channels(&user_address)
863 .await?;
864
865 self.refresh_account_state().await?;
867
868 self.connected = true;
869 self.core.set_connected(true);
870
871 if let Err(e) = self.start_ws_stream().await {
873 tracing::warn!("Failed to start WebSocket stream: {e}");
874 }
875
876 tracing::info!(client_id = %self.core.client_id, "Connected");
877 Ok(())
878 }
879
880 async fn disconnect(&mut self) -> anyhow::Result<()> {
881 if !self.connected {
882 return Ok(());
883 }
884
885 tracing::info!("Disconnecting Hyperliquid execution client");
886
887 self.ws_client.disconnect().await?;
889
890 self.abort_pending_tasks();
892
893 self.connected = false;
894 self.core.set_connected(false);
895
896 tracing::info!(client_id = %self.core.client_id, "Disconnected");
897 Ok(())
898 }
899
900 async fn generate_order_status_report(
901 &self,
902 _cmd: &GenerateOrderStatusReport,
903 ) -> anyhow::Result<Option<OrderStatusReport>> {
904 tracing::warn!("generate_order_status_report not yet fully implemented");
908 Ok(None)
909 }
910
911 async fn generate_order_status_reports(
912 &self,
913 cmd: &GenerateOrderStatusReports,
914 ) -> anyhow::Result<Vec<OrderStatusReport>> {
915 let user_address = self.get_user_address()?;
916
917 let reports = self
918 .http_client
919 .request_order_status_reports(&user_address, cmd.instrument_id)
920 .await
921 .context("failed to generate order status reports")?;
922
923 let reports = if cmd.open_only {
925 reports
926 .into_iter()
927 .filter(|r| r.order_status.is_open())
928 .collect()
929 } else {
930 reports
931 };
932
933 let reports = match (cmd.start, cmd.end) {
935 (Some(start), Some(end)) => reports
936 .into_iter()
937 .filter(|r| r.ts_last >= start && r.ts_last <= end)
938 .collect(),
939 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
940 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
941 (None, None) => reports,
942 };
943
944 tracing::info!("Generated {} order status reports", reports.len());
945 Ok(reports)
946 }
947
948 async fn generate_fill_reports(
949 &self,
950 cmd: GenerateFillReports,
951 ) -> anyhow::Result<Vec<FillReport>> {
952 let user_address = self.get_user_address()?;
953
954 let reports = self
955 .http_client
956 .request_fill_reports(&user_address, cmd.instrument_id)
957 .await
958 .context("failed to generate fill reports")?;
959
960 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
962 reports
963 .into_iter()
964 .filter(|r| r.ts_event >= start && r.ts_event <= end)
965 .collect()
966 } else if let Some(start) = cmd.start {
967 reports
968 .into_iter()
969 .filter(|r| r.ts_event >= start)
970 .collect()
971 } else if let Some(end) = cmd.end {
972 reports.into_iter().filter(|r| r.ts_event <= end).collect()
973 } else {
974 reports
975 };
976
977 tracing::info!("Generated {} fill reports", reports.len());
978 Ok(reports)
979 }
980
981 async fn generate_position_status_reports(
982 &self,
983 cmd: &GeneratePositionStatusReports,
984 ) -> anyhow::Result<Vec<PositionStatusReport>> {
985 let user_address = self.get_user_address()?;
986
987 let reports = self
988 .http_client
989 .request_position_status_reports(&user_address, cmd.instrument_id)
990 .await
991 .context("failed to generate position status reports")?;
992
993 tracing::info!("Generated {} position status reports", reports.len());
994 Ok(reports)
995 }
996
997 async fn generate_mass_status(
998 &self,
999 lookback_mins: Option<u64>,
1000 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1001 tracing::warn!(
1002 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1003 );
1004 Ok(None)
1010 }
1011}
1012
1013impl HyperliquidExecutionClient {
1014 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1015 {
1016 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1017 if handle_guard.is_some() {
1018 return Ok(());
1019 }
1020 }
1021
1022 let user_address = self.get_user_address()?;
1023 let _account_id = self.core.account_id;
1024 let mut ws_client = self.ws_client.clone();
1025
1026 let instruments = self
1027 .http_client
1028 .request_instruments()
1029 .await
1030 .unwrap_or_default();
1031
1032 for instrument in instruments {
1033 ws_client.cache_instrument(instrument);
1034 }
1035
1036 let runtime = get_runtime();
1037 let handle = runtime.spawn(async move {
1038 if let Err(e) = ws_client.connect().await {
1039 tracing::warn!("Failed to connect WebSocket: {e}");
1040 return;
1041 }
1042
1043 if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1044 tracing::warn!("Failed to subscribe to order updates: {e}");
1045 return;
1046 }
1047
1048 if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1049 tracing::warn!("Failed to subscribe to user events: {e}");
1050 return;
1051 }
1052
1053 tracing::info!("Subscribed to Hyperliquid execution updates");
1054
1055 let _clock = get_atomic_clock_realtime();
1056
1057 loop {
1058 let event = ws_client.next_event().await;
1059
1060 match event {
1061 Some(msg) => {
1062 match msg {
1063 NautilusWsMessage::ExecutionReports(reports) => {
1064 for report in reports {
1066 dispatch_execution_report(report);
1067 }
1068 }
1069 NautilusWsMessage::Reconnected => {
1070 tracing::info!("WebSocket reconnected");
1071 }
1073 NautilusWsMessage::Error(e) => {
1074 tracing::error!("WebSocket error: {e}");
1075 }
1076 NautilusWsMessage::Trades(_)
1078 | NautilusWsMessage::Quote(_)
1079 | NautilusWsMessage::Deltas(_)
1080 | NautilusWsMessage::Candle(_)
1081 | NautilusWsMessage::MarkPrice(_)
1082 | NautilusWsMessage::IndexPrice(_)
1083 | NautilusWsMessage::FundingRate(_) => {}
1084 }
1085 }
1086 None => {
1087 tracing::warn!("WebSocket next_event returned None");
1088 break;
1089 }
1090 }
1091 }
1092 });
1093
1094 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1095 tracing::info!("Hyperliquid WebSocket execution stream started");
1096 Ok(())
1097 }
1098}
1099
1100fn dispatch_execution_report(report: ExecutionReport) {
1101 let sender = get_exec_event_sender();
1102 match report {
1103 ExecutionReport::Order(order_report) => {
1104 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1105 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1106 tracing::warn!("Failed to send order status report: {e}");
1107 }
1108 }
1109 ExecutionReport::Fill(fill_report) => {
1110 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1111 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1112 tracing::warn!("Failed to send fill report: {e}");
1113 }
1114 }
1115 }
1116}