1use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23 live::{runner::get_exec_event_sender, runtime::get_runtime},
24 messages::{
25 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
26 execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
29 QueryOrder, SubmitOrder, SubmitOrderList,
30 },
31 },
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_live::execution::client::LiveExecutionClient;
36use nautilus_model::{
37 accounts::AccountAny,
38 enums::{OmsType, OrderType},
39 identifiers::{AccountId, ClientId, Venue},
40 orders::{Order, any::OrderAny},
41 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance},
43};
44use serde_json;
45use tokio::task::JoinHandle;
46
47use crate::{
48 common::{
49 HyperliquidProductType,
50 consts::HYPERLIQUID_VENUE,
51 credential::Secrets,
52 parse::{
53 client_order_id_to_cancel_request, extract_error_message, is_response_successful,
54 order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
55 },
56 },
57 config::HyperliquidExecClientConfig,
58 http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
59 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
60};
61
62#[derive(Debug)]
63pub struct HyperliquidExecutionClient {
64 core: ExecutionClientCore,
65 config: HyperliquidExecClientConfig,
66 http_client: HyperliquidHttpClient,
67 ws_client: HyperliquidWebSocketClient,
68 started: bool,
69 connected: bool,
70 instruments_initialized: bool,
71 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
72 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
73}
74
75impl HyperliquidExecutionClient {
76 pub fn config(&self) -> &HyperliquidExecClientConfig {
78 &self.config
79 }
80
81 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
96 let instrument_id = order.instrument_id();
99 let symbol = instrument_id.symbol.as_str();
100 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
101 anyhow::bail!(
102 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
103 );
104 }
105
106 match order.order_type() {
108 OrderType::Market
109 | OrderType::Limit
110 | OrderType::StopMarket
111 | OrderType::StopLimit
112 | OrderType::MarketIfTouched
113 | OrderType::LimitIfTouched => {}
114 _ => anyhow::bail!(
115 "Unsupported order type for Hyperliquid: {:?}",
116 order.order_type()
117 ),
118 }
119
120 if matches!(
122 order.order_type(),
123 OrderType::StopMarket
124 | OrderType::StopLimit
125 | OrderType::MarketIfTouched
126 | OrderType::LimitIfTouched
127 ) && order.trigger_price().is_none()
128 {
129 anyhow::bail!(
130 "Conditional orders require a trigger price for Hyperliquid: {:?}",
131 order.order_type()
132 );
133 }
134
135 if matches!(
137 order.order_type(),
138 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
139 ) && order.price().is_none()
140 {
141 anyhow::bail!(
142 "Limit orders require a limit price for Hyperliquid: {:?}",
143 order.order_type()
144 );
145 }
146
147 Ok(())
148 }
149
150 pub fn new(
156 core: ExecutionClientCore,
157 config: HyperliquidExecClientConfig,
158 ) -> anyhow::Result<Self> {
159 if !config.has_credentials() {
160 anyhow::bail!("Hyperliquid execution client requires private key");
161 }
162
163 let secrets = Secrets::from_json(&format!(
164 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
165 config.private_key, config.is_testnet
166 ))
167 .context("failed to create secrets from private key")?;
168
169 let http_client = HyperliquidHttpClient::with_credentials(
170 &secrets,
171 Some(config.http_timeout_secs),
172 config.http_proxy_url.clone(),
173 )
174 .context("failed to create Hyperliquid HTTP client")?;
175
176 let ws_client = HyperliquidWebSocketClient::new(
180 None,
181 config.is_testnet,
182 HyperliquidProductType::Perp,
183 Some(core.account_id),
184 );
185
186 Ok(Self {
187 core,
188 config,
189 http_client,
190 ws_client,
191 started: false,
192 connected: false,
193 instruments_initialized: false,
194 pending_tasks: Mutex::new(Vec::new()),
195 ws_stream_handle: Mutex::new(None),
196 })
197 }
198
199 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
200 if self.instruments_initialized {
201 return Ok(());
202 }
203
204 let instruments = self
205 .http_client
206 .request_instruments()
207 .await
208 .context("failed to request Hyperliquid instruments")?;
209
210 if instruments.is_empty() {
211 tracing::warn!(
212 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
213 );
214 } else {
215 tracing::info!("Initialized {} instruments", instruments.len());
216
217 for instrument in &instruments {
218 self.http_client.cache_instrument(instrument.clone());
219 }
220 }
221
222 self.instruments_initialized = true;
223 Ok(())
224 }
225
226 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
227 if self.instruments_initialized {
228 return Ok(());
229 }
230
231 let runtime = get_runtime();
232 runtime.block_on(self.ensure_instruments_initialized_async())
233 }
234
235 async fn refresh_account_state(&self) -> anyhow::Result<()> {
236 let user_address = self.get_user_address()?;
239
240 let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
242
243 let clearinghouse_state = self
245 .http_client
246 .info_clearinghouse_state(account_address)
247 .await
248 .context("failed to fetch clearinghouse state")?;
249
250 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
252 .context("failed to deserialize clearinghouse state")?;
253
254 tracing::debug!(
255 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
256 state.cross_margin_summary,
257 state.asset_positions.len()
258 );
259
260 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
262 let (balances, margins) =
263 crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
264 .context("failed to parse account balances and margins")?;
265
266 let ts_event = if let Some(time_ms) = state.time {
267 nautilus_core::UnixNanos::from(time_ms * 1_000_000)
268 } else {
269 nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
270 };
271
272 self.core.generate_account_state(
274 balances, margins, true, ts_event,
276 )?;
277
278 tracing::info!("Account state updated successfully");
279 } else {
280 tracing::warn!("No cross margin summary in clearinghouse state");
281 }
282
283 Ok(())
284 }
285
286 fn get_user_address(&self) -> anyhow::Result<String> {
287 let address = self
288 .http_client
289 .get_user_address()
290 .context("failed to get user address from HTTP client")?;
291
292 Ok(address)
293 }
294
295 fn spawn_task<F>(&self, description: &'static str, fut: F)
296 where
297 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
298 {
299 let runtime = get_runtime();
300 let handle = runtime.spawn(async move {
301 if let Err(e) = fut.await {
302 tracing::warn!("{description} failed: {e:?}");
303 }
304 });
305
306 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
307 tasks.retain(|handle| !handle.is_finished());
308 tasks.push(handle);
309 }
310
311 fn abort_pending_tasks(&self) {
312 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
313 for handle in tasks.drain(..) {
314 handle.abort();
315 }
316 }
317
318 fn update_account_state(&self) -> anyhow::Result<()> {
319 let runtime = get_runtime();
320 runtime.block_on(self.refresh_account_state())
321 }
322}
323
324#[async_trait(?Send)]
325impl ExecutionClient for HyperliquidExecutionClient {
326 fn is_connected(&self) -> bool {
327 self.connected
328 }
329
330 fn client_id(&self) -> ClientId {
331 self.core.client_id
332 }
333
334 fn account_id(&self) -> AccountId {
335 self.core.account_id
336 }
337
338 fn venue(&self) -> Venue {
339 *HYPERLIQUID_VENUE
340 }
341
342 fn oms_type(&self) -> OmsType {
343 self.core.oms_type
344 }
345
346 fn get_account(&self) -> Option<AccountAny> {
347 self.core.get_account()
348 }
349
350 fn generate_account_state(
351 &self,
352 balances: Vec<AccountBalance>,
353 margins: Vec<MarginBalance>,
354 reported: bool,
355 ts_event: UnixNanos,
356 ) -> anyhow::Result<()> {
357 self.core
358 .generate_account_state(balances, margins, reported, ts_event)
359 }
360
361 fn start(&mut self) -> anyhow::Result<()> {
362 if self.started {
363 return Ok(());
364 }
365
366 tracing::info!(
367 client_id = %self.core.client_id,
368 account_id = %self.core.account_id,
369 is_testnet = self.config.is_testnet,
370 vault_address = ?self.config.vault_address,
371 http_proxy_url = ?self.config.http_proxy_url,
372 ws_proxy_url = ?self.config.ws_proxy_url,
373 "Starting Hyperliquid execution client"
374 );
375
376 self.ensure_instruments_initialized()?;
378
379 if let Err(e) = self.update_account_state() {
381 tracing::warn!("Failed to initialize account state: {e}");
382 }
383
384 self.connected = true;
385 self.started = true;
386
387 if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
389 tracing::warn!("Failed to start WebSocket stream: {e}");
390 }
391
392 tracing::info!("Hyperliquid execution client started");
393 Ok(())
394 }
395 fn stop(&mut self) -> anyhow::Result<()> {
396 if !self.started {
397 return Ok(());
398 }
399
400 tracing::info!("Stopping Hyperliquid execution client");
401
402 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
404 handle.abort();
405 }
406
407 self.abort_pending_tasks();
409
410 if self.connected {
412 let runtime = get_runtime();
413 runtime.block_on(async {
414 if let Err(e) = self.ws_client.disconnect().await {
415 tracing::warn!("Error disconnecting WebSocket client: {e}");
416 }
417 });
418 }
419
420 self.connected = false;
421 self.started = false;
422
423 tracing::info!("Hyperliquid execution client stopped");
424 Ok(())
425 }
426
427 fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
428 let order = &command.order;
429
430 if order.is_closed() {
431 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
432 return Ok(());
433 }
434
435 if let Err(e) = self.validate_order_submission(order) {
436 self.core.generate_order_rejected(
437 order.strategy_id(),
438 order.instrument_id(),
439 order.client_order_id(),
440 &format!("validation-error: {e}"),
441 command.ts_init,
442 false,
443 );
444 return Err(e);
445 }
446
447 self.core.generate_order_submitted(
448 order.strategy_id(),
449 order.instrument_id(),
450 order.client_order_id(),
451 command.ts_init,
452 );
453
454 let http_client = self.http_client.clone();
455 let order_clone = order.clone();
456
457 self.spawn_task("submit_order", async move {
458 match order_any_to_hyperliquid_request(&order_clone) {
459 Ok(hyperliquid_order) => {
460 let action = ExchangeAction::order(vec![hyperliquid_order]);
462
463 match http_client.post_action(&action).await {
464 Ok(response) => {
465 if is_response_successful(&response) {
466 tracing::info!("Order submitted successfully: {:?}", response);
467 } else {
470 let error_msg = extract_error_message(&response);
471 tracing::warn!(
472 "Order submission rejected by exchange: {}",
473 error_msg
474 );
475 }
477 }
478 Err(e) => {
479 tracing::warn!("Order submission HTTP request failed: {e}");
480 }
482 }
483 }
484 Err(e) => {
485 tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
486 }
488 }
489
490 Ok(())
491 });
492
493 Ok(())
494 }
495
496 fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
497 tracing::debug!(
498 "Submitting order list with {} orders",
499 command.order_list.orders.len()
500 );
501
502 let http_client = self.http_client.clone();
503 let orders: Vec<OrderAny> = command.order_list.orders.clone();
504
505 for order in &orders {
507 self.core.generate_order_submitted(
508 order.strategy_id(),
509 order.instrument_id(),
510 order.client_order_id(),
511 command.ts_init,
512 );
513 }
514
515 self.spawn_task("submit_order_list", async move {
516 let order_refs: Vec<&OrderAny> = orders.iter().collect();
518 match orders_to_hyperliquid_requests(&order_refs) {
519 Ok(hyperliquid_orders) => {
520 let action = ExchangeAction::order(hyperliquid_orders);
522 match http_client.post_action(&action).await {
523 Ok(response) => {
524 if is_response_successful(&response) {
525 tracing::info!("Order list submitted successfully: {:?}", response);
526 } else {
528 let error_msg = extract_error_message(&response);
529 tracing::warn!(
530 "Order list submission rejected by exchange: {}",
531 error_msg
532 );
533 }
535 }
536 Err(e) => {
537 tracing::warn!("Order list submission HTTP request failed: {e}");
538 }
540 }
541 }
542 Err(e) => {
543 tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
544 }
545 }
546
547 Ok(())
548 });
549
550 Ok(())
551 }
552
553 fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
554 tracing::debug!("Modifying order: {:?}", command);
555
556 let oid: u64 = match command.venue_order_id.as_str().parse() {
558 Ok(id) => id,
559 Err(e) => {
560 tracing::warn!(
561 "Failed to parse venue_order_id '{}' as u64: {}",
562 command.venue_order_id,
563 e
564 );
565 return Ok(());
566 }
567 };
568
569 let http_client = self.http_client.clone();
570 let price = command.price;
571 let quantity = command.quantity;
572 let symbol = command.instrument_id.symbol.inner();
573
574 self.spawn_task("modify_order", async move {
575 use crate::{
576 common::parse::extract_asset_id_from_symbol,
577 http::models::HyperliquidExecModifyOrderRequest,
578 };
579
580 let asset = match extract_asset_id_from_symbol(&symbol) {
582 Ok(asset) => asset,
583 Err(e) => {
584 tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
585 return Ok(());
586 }
587 };
588
589 let modify_request = HyperliquidExecModifyOrderRequest {
591 asset,
592 oid,
593 price: price.map(|p| (*p).into()),
594 size: quantity.map(|q| (*q).into()),
595 reduce_only: None,
596 kind: None,
597 };
598
599 let action = ExchangeAction::modify(oid, modify_request);
600
601 match http_client.post_action(&action).await {
602 Ok(response) => {
603 if is_response_successful(&response) {
604 tracing::info!("Order modified successfully: {:?}", response);
605 } else {
607 let error_msg = extract_error_message(&response);
608 tracing::warn!("Order modification rejected by exchange: {}", error_msg);
609 }
611 }
612 Err(e) => {
613 tracing::warn!("Order modification HTTP request failed: {e}");
614 }
616 }
617
618 Ok(())
619 });
620
621 Ok(())
622 }
623
624 fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
625 tracing::debug!("Cancelling order: {:?}", command);
626
627 let http_client = self.http_client.clone();
628 let client_order_id = command.client_order_id.inner();
629 let symbol = command.instrument_id.symbol.inner();
630
631 self.spawn_task("cancel_order", async move {
632 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
633 Ok(cancel_request) => {
634 let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
636 match http_client.post_action(&action).await {
637 Ok(response) => {
638 if is_response_successful(&response) {
639 tracing::info!("Order cancelled successfully: {:?}", response);
640 } else {
643 let error_msg = extract_error_message(&response);
644 tracing::warn!(
645 "Order cancellation rejected by exchange: {}",
646 error_msg
647 );
648 }
650 }
651 Err(e) => {
652 tracing::warn!("Order cancellation HTTP request failed: {e}");
653 }
655 }
656 }
657 Err(e) => {
658 tracing::warn!(
659 "Failed to convert order to Hyperliquid cancel format: {:?}",
660 e
661 );
662 }
663 }
664
665 Ok(())
666 });
667
668 Ok(())
669 }
670
671 fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
672 tracing::debug!("Cancelling all orders: {:?}", command);
673
674 let cache = self.core.cache().borrow();
676 let open_orders = cache.orders_open(
677 Some(&self.core.venue),
678 Some(&command.instrument_id),
679 None,
680 Some(command.order_side),
681 );
682
683 if open_orders.is_empty() {
684 tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
685 return Ok(());
686 }
687
688 let mut cancel_requests = Vec::new();
690 let symbol = command.instrument_id.symbol.inner();
691 for order in open_orders {
692 let client_order_id = order.client_order_id().inner();
693
694 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
695 Ok(req) => cancel_requests.push(req),
696 Err(e) => {
697 tracing::warn!(
698 "Failed to convert order {} to cancel request: {}",
699 client_order_id,
700 e
701 );
702 continue;
703 }
704 }
705 }
706
707 if cancel_requests.is_empty() {
708 tracing::debug!("No valid cancel requests to send");
709 return Ok(());
710 }
711
712 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
714
715 let http_client = self.http_client.clone();
718 let runtime = get_runtime();
719 runtime.spawn(async move {
720 if let Err(e) = http_client.post_action(&action).await {
721 tracing::warn!("Failed to send cancel all orders request: {e}");
722 }
723 });
724
725 Ok(())
726 }
727
728 fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
729 tracing::debug!("Batch cancelling orders: {:?}", command);
730
731 if command.cancels.is_empty() {
732 tracing::debug!("No orders to cancel in batch");
733 return Ok(());
734 }
735
736 let mut cancel_requests = Vec::new();
738 for cancel_cmd in &command.cancels {
739 let client_order_id = cancel_cmd.client_order_id.inner();
740 let symbol = cancel_cmd.instrument_id.symbol.inner();
741
742 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
743 Ok(req) => cancel_requests.push(req),
744 Err(e) => {
745 tracing::warn!(
746 "Failed to convert order {} to cancel request: {}",
747 client_order_id,
748 e
749 );
750 continue;
751 }
752 }
753 }
754
755 if cancel_requests.is_empty() {
756 tracing::warn!("No valid cancel requests in batch");
757 return Ok(());
758 }
759
760 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
761
762 let http_client = self.http_client.clone();
765 let runtime = get_runtime();
766 runtime.spawn(async move {
767 if let Err(e) = http_client.post_action(&action).await {
768 tracing::warn!("Failed to send batch cancel orders request: {e}");
769 }
770 });
771
772 Ok(())
773 }
774
775 fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
776 tracing::debug!("Querying account: {:?}", command);
777
778 let runtime = get_runtime();
780 runtime.block_on(async {
781 if let Err(e) = self.refresh_account_state().await {
782 tracing::warn!("Failed to query account state: {e}");
783 }
784 });
785
786 Ok(())
787 }
788
789 fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
790 tracing::debug!("Querying order: {:?}", command);
791
792 let cache = self.core.cache().borrow();
794 let venue_order_id = cache.venue_order_id(&command.client_order_id);
795
796 let venue_order_id = match venue_order_id {
797 Some(oid) => *oid,
798 None => {
799 tracing::warn!(
800 "No venue order ID found for client order {}",
801 command.client_order_id
802 );
803 return Ok(());
804 }
805 };
806 drop(cache);
807
808 let oid = match u64::from_str(venue_order_id.as_ref()) {
810 Ok(id) => id,
811 Err(e) => {
812 tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
813 return Ok(());
814 }
815 };
816
817 let user_address = self.get_user_address()?;
819
820 let http_client = self.http_client.clone();
824 let runtime = get_runtime();
825 runtime.spawn(async move {
826 match http_client.info_order_status(&user_address, oid).await {
827 Ok(status) => {
828 tracing::debug!("Order status for oid {}: {:?}", oid, status);
829 }
830 Err(e) => {
831 tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
832 }
833 }
834 });
835
836 Ok(())
837 }
838
839 async fn connect(&mut self) -> anyhow::Result<()> {
840 if self.connected {
841 return Ok(());
842 }
843
844 tracing::info!("Connecting Hyperliquid execution client");
845
846 self.ensure_instruments_initialized_async().await?;
848
849 self.ws_client.connect().await?;
851
852 let user_address = self.get_user_address()?;
854 self.ws_client
855 .subscribe_all_user_channels(&user_address)
856 .await?;
857
858 self.refresh_account_state().await?;
860
861 self.connected = true;
862 self.core.set_connected(true);
863
864 if let Err(e) = self.start_ws_stream().await {
866 tracing::warn!("Failed to start WebSocket stream: {e}");
867 }
868
869 tracing::info!(client_id = %self.core.client_id, "Connected");
870 Ok(())
871 }
872
873 async fn disconnect(&mut self) -> anyhow::Result<()> {
874 if !self.connected {
875 return Ok(());
876 }
877
878 tracing::info!("Disconnecting Hyperliquid execution client");
879
880 self.ws_client.disconnect().await?;
882
883 self.abort_pending_tasks();
885
886 self.connected = false;
887 self.core.set_connected(false);
888
889 tracing::info!(client_id = %self.core.client_id, "Disconnected");
890 Ok(())
891 }
892}
893
894#[async_trait(?Send)]
895impl LiveExecutionClient for HyperliquidExecutionClient {
896 async fn generate_order_status_report(
897 &self,
898 _cmd: &GenerateOrderStatusReport,
899 ) -> anyhow::Result<Option<OrderStatusReport>> {
900 tracing::warn!("generate_order_status_report not yet fully implemented");
904 Ok(None)
905 }
906
907 async fn generate_order_status_reports(
908 &self,
909 cmd: &GenerateOrderStatusReport,
910 ) -> anyhow::Result<Vec<OrderStatusReport>> {
911 let user_address = self.get_user_address()?;
912
913 let reports = self
914 .http_client
915 .request_order_status_reports(&user_address, cmd.instrument_id)
916 .await
917 .context("failed to generate order status reports")?;
918
919 let reports = if let Some(client_order_id) = cmd.client_order_id {
921 reports
922 .into_iter()
923 .filter(|r| r.client_order_id == Some(client_order_id))
924 .collect()
925 } else {
926 reports
927 };
928
929 tracing::info!("Generated {} order status reports", reports.len());
933 Ok(reports)
934 }
935
936 async fn generate_fill_reports(
937 &self,
938 cmd: GenerateFillReports,
939 ) -> anyhow::Result<Vec<FillReport>> {
940 let user_address = self.get_user_address()?;
941
942 let reports = self
943 .http_client
944 .request_fill_reports(&user_address, cmd.instrument_id)
945 .await
946 .context("failed to generate fill reports")?;
947
948 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
950 reports
951 .into_iter()
952 .filter(|r| r.ts_event >= start && r.ts_event <= end)
953 .collect()
954 } else if let Some(start) = cmd.start {
955 reports
956 .into_iter()
957 .filter(|r| r.ts_event >= start)
958 .collect()
959 } else if let Some(end) = cmd.end {
960 reports.into_iter().filter(|r| r.ts_event <= end).collect()
961 } else {
962 reports
963 };
964
965 tracing::info!("Generated {} fill reports", reports.len());
966 Ok(reports)
967 }
968
969 async fn generate_position_status_reports(
970 &self,
971 cmd: &GeneratePositionReports,
972 ) -> anyhow::Result<Vec<PositionStatusReport>> {
973 let user_address = self.get_user_address()?;
974
975 let reports = self
976 .http_client
977 .request_position_status_reports(&user_address, cmd.instrument_id)
978 .await
979 .context("failed to generate position status reports")?;
980
981 tracing::info!("Generated {} position status reports", reports.len());
982 Ok(reports)
983 }
984
985 async fn generate_mass_status(
986 &self,
987 lookback_mins: Option<u64>,
988 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
989 tracing::warn!(
990 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
991 );
992 Ok(None)
998 }
999}
1000
1001impl HyperliquidExecutionClient {
1002 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1003 {
1004 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1005 if handle_guard.is_some() {
1006 return Ok(());
1007 }
1008 }
1009
1010 let user_address = self.get_user_address()?;
1011 let _account_id = self.core.account_id;
1012 let mut ws_client = self.ws_client.clone();
1013
1014 let instruments = self
1015 .http_client
1016 .request_instruments()
1017 .await
1018 .unwrap_or_default();
1019
1020 for instrument in instruments {
1021 ws_client.cache_instrument(instrument);
1022 }
1023
1024 let runtime = get_runtime();
1025 let handle = runtime.spawn(async move {
1026 if let Err(e) = ws_client.connect().await {
1027 tracing::warn!("Failed to connect WebSocket: {e}");
1028 return;
1029 }
1030
1031 if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1032 tracing::warn!("Failed to subscribe to order updates: {e}");
1033 return;
1034 }
1035
1036 if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1037 tracing::warn!("Failed to subscribe to user events: {e}");
1038 return;
1039 }
1040
1041 tracing::info!("Subscribed to Hyperliquid execution updates");
1042
1043 let _clock = get_atomic_clock_realtime();
1044
1045 loop {
1046 let event = ws_client.next_event().await;
1047
1048 match event {
1049 Some(msg) => {
1050 match msg {
1051 NautilusWsMessage::ExecutionReports(reports) => {
1052 for report in reports {
1054 dispatch_execution_report(report);
1055 }
1056 }
1057 NautilusWsMessage::Reconnected => {
1058 tracing::info!("WebSocket reconnected");
1059 }
1061 NautilusWsMessage::Error(e) => {
1062 tracing::error!("WebSocket error: {e}");
1063 }
1064 NautilusWsMessage::Trades(_)
1066 | NautilusWsMessage::Quote(_)
1067 | NautilusWsMessage::Deltas(_)
1068 | NautilusWsMessage::Candle(_)
1069 | NautilusWsMessage::MarkPrice(_)
1070 | NautilusWsMessage::IndexPrice(_)
1071 | NautilusWsMessage::FundingRate(_) => {}
1072 }
1073 }
1074 None => {
1075 tracing::warn!("WebSocket next_event returned None");
1076 break;
1077 }
1078 }
1079 }
1080 });
1081
1082 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1083 tracing::info!("Hyperliquid WebSocket execution stream started");
1084 Ok(())
1085 }
1086}
1087
1088fn dispatch_execution_report(report: ExecutionReport) {
1089 let sender = get_exec_event_sender();
1090 match report {
1091 ExecutionReport::Order(order_report) => {
1092 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1093 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1094 tracing::warn!("Failed to send order status report: {e}");
1095 }
1096 }
1097 ExecutionReport::Fill(fill_report) => {
1098 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1099 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1100 tracing::warn!("Failed to send fill report: {e}");
1101 }
1102 }
1103 }
1104}