1use std::{cell::Ref, str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use nautilus_common::{
22 clock::Clock,
23 messages::{
24 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
25 execution::{
26 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
27 SubmitOrder, SubmitOrderList,
28 },
29 },
30 runner::get_exec_event_sender,
31 runtime::get_runtime,
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_live::execution::LiveExecutionClientExt;
36use nautilus_model::{
37 accounts::AccountAny,
38 enums::{OmsType, OrderType},
39 identifiers::{AccountId, ClientId, Venue},
40 orders::{Order, any::OrderAny},
41 types::{AccountBalance, MarginBalance},
42};
43use serde_json;
44use tokio::task::JoinHandle;
45
46use crate::{
47 common::{
48 consts::HYPERLIQUID_VENUE,
49 credential::Secrets,
50 parse::{
51 client_order_id_to_cancel_request, extract_error_message, is_response_successful,
52 order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
53 },
54 },
55 config::HyperliquidExecClientConfig,
56 http::{client::HyperliquidHttpClient, query::ExchangeAction},
57 websocket::{
58 ExecutionReport,
59 client::HyperliquidWebSocketClient,
60 messages::HyperliquidWsMessage as HyperliquidWsMsg,
61 parse::{parse_ws_fill_report, parse_ws_order_status_report},
62 },
63};
64
65#[derive(Debug)]
66pub struct HyperliquidExecutionClient {
67 core: ExecutionClientCore,
68 config: HyperliquidExecClientConfig,
69 http_client: HyperliquidHttpClient,
70 ws_client: HyperliquidWebSocketClient,
71 started: bool,
72 connected: bool,
73 instruments_initialized: bool,
74 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
76}
77
78impl HyperliquidExecutionClient {
79 pub fn config(&self) -> &HyperliquidExecClientConfig {
81 &self.config
82 }
83
84 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
99 let symbol = order.instrument_id().symbol.to_string();
101 if !symbol.ends_with("-USD") {
102 anyhow::bail!("Unsupported instrument symbol format for Hyperliquid: {symbol}");
103 }
104
105 match order.order_type() {
107 OrderType::Market
108 | OrderType::Limit
109 | OrderType::StopMarket
110 | OrderType::StopLimit
111 | OrderType::MarketIfTouched
112 | OrderType::LimitIfTouched => {}
113 _ => anyhow::bail!(
114 "Unsupported order type for Hyperliquid: {:?}",
115 order.order_type()
116 ),
117 }
118
119 if matches!(
121 order.order_type(),
122 OrderType::StopMarket
123 | OrderType::StopLimit
124 | OrderType::MarketIfTouched
125 | OrderType::LimitIfTouched
126 ) && order.trigger_price().is_none()
127 {
128 anyhow::bail!(
129 "Conditional orders require a trigger price for Hyperliquid: {:?}",
130 order.order_type()
131 );
132 }
133
134 if matches!(
136 order.order_type(),
137 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138 ) && order.price().is_none()
139 {
140 anyhow::bail!(
141 "Limit orders require a limit price for Hyperliquid: {:?}",
142 order.order_type()
143 );
144 }
145
146 Ok(())
147 }
148
149 pub fn new(
155 core: ExecutionClientCore,
156 config: HyperliquidExecClientConfig,
157 ) -> anyhow::Result<Self> {
158 if !config.has_credentials() {
159 anyhow::bail!("Hyperliquid execution client requires private key");
160 }
161
162 let secrets = Secrets::from_json(&format!(
163 r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164 config.private_key, config.is_testnet
165 ))
166 .context("failed to create secrets from private key")?;
167
168 let http_client =
169 HyperliquidHttpClient::with_credentials(&secrets, Some(config.http_timeout_secs));
170
171 let ws_client = HyperliquidWebSocketClient::new(
173 crate::common::consts::ws_url(config.is_testnet).to_string(),
174 );
175
176 Ok(Self {
177 core,
178 config,
179 http_client,
180 ws_client,
181 started: false,
182 connected: false,
183 instruments_initialized: false,
184 pending_tasks: Mutex::new(Vec::new()),
185 ws_stream_handle: Mutex::new(None),
186 })
187 }
188
189 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
190 if self.instruments_initialized {
191 return Ok(());
192 }
193
194 let instruments = self
195 .http_client
196 .request_instruments()
197 .await
198 .context("failed to request Hyperliquid instruments")?;
199
200 if instruments.is_empty() {
201 tracing::warn!(
202 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
203 );
204 } else {
205 tracing::info!("Initialized {} instruments", instruments.len());
206 }
207
208 self.instruments_initialized = true;
209 Ok(())
210 }
211
212 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
213 if self.instruments_initialized {
214 return Ok(());
215 }
216
217 let runtime = get_runtime();
218 runtime.block_on(self.ensure_instruments_initialized_async())
219 }
220
221 async fn refresh_account_state(&self) -> anyhow::Result<()> {
222 let user_address = self.get_user_address()?;
225
226 let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
228
229 let clearinghouse_state = self
231 .http_client
232 .info_clearinghouse_state(account_address)
233 .await
234 .context("Failed to fetch clearinghouse state")?;
235
236 let state: crate::http::models::ClearinghouseState =
238 serde_json::from_value(clearinghouse_state)
239 .context("Failed to deserialize clearinghouse state")?;
240
241 tracing::debug!(
242 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
243 state.cross_margin_summary,
244 state.asset_positions.len()
245 );
246
247 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
249 let (balances, margins) =
250 crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
251 .context("Failed to parse account balances and margins")?;
252
253 let ts_event = if let Some(time_ms) = state.time {
254 nautilus_core::UnixNanos::from(time_ms * 1_000_000)
255 } else {
256 nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
257 };
258
259 self.core.generate_account_state(
261 balances, margins, true, ts_event,
263 )?;
264
265 tracing::info!("Account state updated successfully");
266 } else {
267 tracing::warn!("No cross margin summary in clearinghouse state");
268 }
269
270 Ok(())
271 }
272
273 fn get_user_address(&self) -> anyhow::Result<String> {
274 let address = self
277 .http_client
278 .get_user_address()
279 .context("Failed to get user address from HTTP client")?;
280
281 Ok(address)
282 }
283
284 fn spawn_task<F>(&self, description: &'static str, fut: F)
285 where
286 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
287 {
288 let runtime = get_runtime();
289 let handle = runtime.spawn(async move {
290 if let Err(e) = fut.await {
291 tracing::warn!("{description} failed: {e:?}");
292 }
293 });
294
295 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
296 tasks.retain(|handle| !handle.is_finished());
297 tasks.push(handle);
298 }
299
300 fn abort_pending_tasks(&self) {
301 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
302 for handle in tasks.drain(..) {
303 handle.abort();
304 }
305 }
306
307 fn update_account_state(&self) -> anyhow::Result<()> {
308 let runtime = get_runtime();
309 runtime.block_on(self.refresh_account_state())
310 }
311}
312
313impl ExecutionClient for HyperliquidExecutionClient {
314 fn is_connected(&self) -> bool {
315 self.connected
316 }
317
318 fn client_id(&self) -> ClientId {
319 self.core.client_id
320 }
321
322 fn account_id(&self) -> AccountId {
323 self.core.account_id
324 }
325
326 fn venue(&self) -> Venue {
327 *HYPERLIQUID_VENUE
328 }
329
330 fn oms_type(&self) -> OmsType {
331 self.core.oms_type
332 }
333
334 fn get_account(&self) -> Option<AccountAny> {
335 self.core.get_account()
336 }
337
338 fn generate_account_state(
339 &self,
340 balances: Vec<AccountBalance>,
341 margins: Vec<MarginBalance>,
342 reported: bool,
343 ts_event: UnixNanos,
344 ) -> anyhow::Result<()> {
345 self.core
346 .generate_account_state(balances, margins, reported, ts_event)
347 }
348
349 fn start(&mut self) -> anyhow::Result<()> {
350 if self.started {
351 return Ok(());
352 }
353
354 tracing::info!("Starting Hyperliquid execution client");
355
356 self.ensure_instruments_initialized()?;
358
359 if let Err(e) = self.update_account_state() {
361 tracing::warn!("Failed to initialize account state: {}", e);
362 }
363
364 self.connected = true;
365 self.started = true;
366
367 if let Err(e) = self.start_ws_stream() {
369 tracing::warn!("Failed to start WebSocket stream: {}", e);
370 }
371
372 tracing::info!("Hyperliquid execution client started");
373 Ok(())
374 }
375 fn stop(&mut self) -> anyhow::Result<()> {
376 if !self.started {
377 return Ok(());
378 }
379
380 tracing::info!("Stopping Hyperliquid execution client");
381
382 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
384 handle.abort();
385 }
386
387 self.abort_pending_tasks();
389
390 if self.connected {
392 let runtime = get_runtime();
393 runtime.block_on(async {
394 if let Err(e) = self.ws_client.disconnect().await {
395 tracing::warn!("Error disconnecting WebSocket client: {e}");
396 }
397 });
398 }
399
400 self.connected = false;
401 self.started = false;
402
403 tracing::info!("Hyperliquid execution client stopped");
404 Ok(())
405 }
406
407 fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
408 let order = &command.order;
409
410 if order.is_closed() {
411 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
412 return Ok(());
413 }
414
415 if let Err(e) = self.validate_order_submission(order) {
417 self.core.generate_order_rejected(
418 order.strategy_id(),
419 order.instrument_id(),
420 order.client_order_id(),
421 &format!("validation-error: {e}"),
422 command.ts_init,
423 false,
424 );
425 return Err(e);
426 }
427
428 self.core.generate_order_submitted(
429 order.strategy_id(),
430 order.instrument_id(),
431 order.client_order_id(),
432 command.ts_init,
433 );
434
435 let http_client = self.http_client.clone();
436 let order_clone = order.clone();
437
438 self.spawn_task("submit_order", async move {
439 match order_any_to_hyperliquid_request(&order_clone) {
440 Ok(hyperliquid_order) => {
441 let action = ExchangeAction::order(vec![hyperliquid_order]);
443
444 match http_client.post_action(&action).await {
445 Ok(response) => {
446 if is_response_successful(&response) {
447 tracing::info!("Order submitted successfully: {:?}", response);
448 } else {
451 let error_msg = extract_error_message(&response);
452 tracing::warn!(
453 "Order submission rejected by exchange: {}",
454 error_msg
455 );
456 }
458 }
459 Err(e) => {
460 tracing::warn!("Order submission HTTP request failed: {e}");
461 }
463 }
464 }
465 Err(e) => {
466 tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
467 }
469 }
470
471 Ok(())
472 });
473
474 Ok(())
475 }
476
477 fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
478 tracing::debug!(
479 "Submitting order list with {} orders",
480 command.order_list.orders.len()
481 );
482
483 let http_client = self.http_client.clone();
484 let orders: Vec<OrderAny> = command.order_list.orders.clone();
485
486 for order in &orders {
488 self.core.generate_order_submitted(
489 order.strategy_id(),
490 order.instrument_id(),
491 order.client_order_id(),
492 command.ts_init,
493 );
494 }
495
496 self.spawn_task("submit_order_list", async move {
497 let order_refs: Vec<&OrderAny> = orders.iter().collect();
499 match orders_to_hyperliquid_requests(&order_refs) {
500 Ok(hyperliquid_orders) => {
501 let action = ExchangeAction::order(hyperliquid_orders);
503 match http_client.post_action(&action).await {
504 Ok(response) => {
505 if is_response_successful(&response) {
506 tracing::info!("Order list submitted successfully: {:?}", response);
507 } else {
509 let error_msg = extract_error_message(&response);
510 tracing::warn!(
511 "Order list submission rejected by exchange: {}",
512 error_msg
513 );
514 }
516 }
517 Err(e) => {
518 tracing::warn!("Order list submission HTTP request failed: {e}");
519 }
521 }
522 }
523 Err(e) => {
524 tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
525 }
526 }
527
528 Ok(())
529 });
530
531 Ok(())
532 }
533
534 fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
535 tracing::debug!("Modifying order: {:?}", command);
536
537 let oid: u64 = match command.venue_order_id.as_str().parse() {
539 Ok(id) => id,
540 Err(e) => {
541 tracing::warn!(
542 "Failed to parse venue_order_id '{}' as u64: {}",
543 command.venue_order_id,
544 e
545 );
546 return Ok(());
547 }
548 };
549
550 let http_client = self.http_client.clone();
551 let price = command.price;
552 let quantity = command.quantity;
553 let symbol = command.instrument_id.symbol.to_string();
554
555 self.spawn_task("modify_order", async move {
556 use crate::{
557 common::parse::extract_asset_id_from_symbol,
558 http::models::HyperliquidExecModifyOrderRequest,
559 };
560
561 let asset = match extract_asset_id_from_symbol(&symbol) {
563 Ok(asset) => asset,
564 Err(e) => {
565 tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
566 return Ok(());
567 }
568 };
569
570 let modify_request = HyperliquidExecModifyOrderRequest {
572 asset,
573 oid,
574 price: price.map(|p| (*p).into()),
575 size: quantity.map(|q| (*q).into()),
576 reduce_only: None,
577 kind: None,
578 };
579
580 let action = ExchangeAction::modify(oid, modify_request);
581
582 match http_client.post_action(&action).await {
583 Ok(response) => {
584 if is_response_successful(&response) {
585 tracing::info!("Order modified successfully: {:?}", response);
586 } else {
588 let error_msg = extract_error_message(&response);
589 tracing::warn!("Order modification rejected by exchange: {}", error_msg);
590 }
592 }
593 Err(e) => {
594 tracing::warn!("Order modification HTTP request failed: {e}");
595 }
597 }
598
599 Ok(())
600 });
601
602 Ok(())
603 }
604
605 fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
606 tracing::debug!("Cancelling order: {:?}", command);
607
608 let http_client = self.http_client.clone();
609 let client_order_id = command.client_order_id.to_string();
610 let symbol = command.instrument_id.symbol.to_string();
611
612 self.spawn_task("cancel_order", async move {
613 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
614 Ok(cancel_request) => {
615 let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
617 match http_client.post_action(&action).await {
618 Ok(response) => {
619 if is_response_successful(&response) {
620 tracing::info!("Order cancelled successfully: {:?}", response);
621 } else {
624 let error_msg = extract_error_message(&response);
625 tracing::warn!(
626 "Order cancellation rejected by exchange: {}",
627 error_msg
628 );
629 }
631 }
632 Err(e) => {
633 tracing::warn!("Order cancellation HTTP request failed: {e}");
634 }
636 }
637 }
638 Err(e) => {
639 tracing::warn!(
640 "Failed to convert order to Hyperliquid cancel format: {:?}",
641 e
642 );
643 }
644 }
645
646 Ok(())
647 });
648
649 Ok(())
650 }
651
652 fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
653 tracing::debug!("Cancelling all orders: {:?}", command);
654
655 let cache = self.core.cache().borrow();
657 let open_orders = cache.orders_open(
658 Some(&self.core.venue),
659 Some(&command.instrument_id),
660 None,
661 Some(command.order_side),
662 );
663
664 if open_orders.is_empty() {
665 tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
666 return Ok(());
667 }
668
669 let mut cancel_requests = Vec::new();
671 for order in open_orders {
672 let client_order_id = order.client_order_id().to_string();
673 let symbol = command.instrument_id.symbol.to_string();
674
675 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
676 Ok(req) => cancel_requests.push(req),
677 Err(e) => {
678 tracing::warn!(
679 "Failed to convert order {} to cancel request: {}",
680 client_order_id,
681 e
682 );
683 continue;
684 }
685 }
686 }
687
688 if cancel_requests.is_empty() {
689 tracing::debug!("No valid cancel requests to send");
690 return Ok(());
691 }
692
693 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
695
696 let http_client = self.http_client.clone();
699 let runtime = get_runtime();
700 runtime.spawn(async move {
701 if let Err(e) = http_client.post_action(&action).await {
702 tracing::warn!("Failed to send cancel all orders request: {}", e);
703 }
704 });
705
706 Ok(())
707 }
708
709 fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
710 tracing::debug!("Batch cancelling orders: {:?}", command);
711
712 if command.cancels.is_empty() {
713 tracing::debug!("No orders to cancel in batch");
714 return Ok(());
715 }
716
717 let mut cancel_requests = Vec::new();
719 for cancel_cmd in &command.cancels {
720 let client_order_id = cancel_cmd.client_order_id.to_string();
721 let symbol = cancel_cmd.instrument_id.symbol.to_string();
722
723 match client_order_id_to_cancel_request(&client_order_id, &symbol) {
724 Ok(req) => cancel_requests.push(req),
725 Err(e) => {
726 tracing::warn!(
727 "Failed to convert order {} to cancel request: {}",
728 client_order_id,
729 e
730 );
731 continue;
732 }
733 }
734 }
735
736 if cancel_requests.is_empty() {
737 tracing::warn!("No valid cancel requests in batch");
738 return Ok(());
739 }
740
741 let action = ExchangeAction::cancel_by_cloid(cancel_requests);
742
743 let http_client = self.http_client.clone();
746 let runtime = get_runtime();
747 runtime.spawn(async move {
748 if let Err(e) = http_client.post_action(&action).await {
749 tracing::warn!("Failed to send batch cancel orders request: {}", e);
750 }
751 });
752
753 Ok(())
754 }
755
756 fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
757 tracing::debug!("Querying account: {:?}", command);
758
759 let runtime = get_runtime();
761 runtime.block_on(async {
762 if let Err(e) = self.refresh_account_state().await {
763 tracing::warn!("Failed to query account state: {}", e);
764 }
765 });
766
767 Ok(())
768 }
769
770 fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
771 tracing::debug!("Querying order: {:?}", command);
772
773 let cache = self.core.cache().borrow();
775 let venue_order_id = cache.venue_order_id(&command.client_order_id);
776
777 let venue_order_id = match venue_order_id {
778 Some(oid) => *oid,
779 None => {
780 tracing::warn!(
781 "No venue order ID found for client order {}",
782 command.client_order_id
783 );
784 return Ok(());
785 }
786 };
787 drop(cache);
788
789 let oid = match u64::from_str(venue_order_id.as_ref()) {
791 Ok(id) => id,
792 Err(e) => {
793 tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
794 return Ok(());
795 }
796 };
797
798 let user_address = self.get_user_address()?;
800
801 let http_client = self.http_client.clone();
805 let runtime = get_runtime();
806 runtime.spawn(async move {
807 match http_client.info_order_status(&user_address, oid).await {
808 Ok(status) => {
809 tracing::debug!("Order status for oid {}: {:?}", oid, status);
810 }
811 Err(e) => {
812 tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
813 }
814 }
815 });
816
817 Ok(())
818 }
819}
820
821use async_trait::async_trait;
826use nautilus_common::messages::execution::{
827 GenerateFillReports, GenerateOrderStatusReport, GeneratePositionReports,
828};
829use nautilus_execution::client::LiveExecutionClient;
830use nautilus_model::reports::{
831 ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport,
832};
833
834#[async_trait(?Send)]
835impl LiveExecutionClient for HyperliquidExecutionClient {
836 async fn connect(&mut self) -> anyhow::Result<()> {
837 if self.connected {
838 return Ok(());
839 }
840
841 tracing::info!("Connecting Hyperliquid execution client");
842
843 self.ensure_instruments_initialized_async().await?;
845
846 let url = crate::common::consts::ws_url(self.config.is_testnet);
848 self.ws_client = HyperliquidWebSocketClient::connect(url).await?;
849
850 let user_address = self.get_user_address()?;
852 self.ws_client
853 .subscribe_all_user_channels(&user_address)
854 .await?;
855
856 self.refresh_account_state().await?;
858
859 self.connected = true;
863 self.core.set_connected(true);
864
865 tracing::info!(
866 "Hyperliquid execution client {} connected",
867 self.core.client_id
868 );
869 Ok(())
870 }
871
872 async fn disconnect(&mut self) -> anyhow::Result<()> {
873 if !self.connected {
874 return Ok(());
875 }
876
877 tracing::info!("Disconnecting Hyperliquid execution client");
878
879 self.ws_client.disconnect().await?;
881
882 self.abort_pending_tasks();
884
885 self.connected = false;
886 self.core.set_connected(false);
887
888 tracing::info!(
889 "Hyperliquid execution client {} disconnected",
890 self.core.client_id
891 );
892 Ok(())
893 }
894
895 async fn generate_order_status_report(
896 &self,
897 _cmd: &GenerateOrderStatusReport,
898 ) -> anyhow::Result<Option<OrderStatusReport>> {
899 tracing::warn!("generate_order_status_report not yet fully implemented");
903 Ok(None)
904 }
905
906 async fn generate_order_status_reports(
907 &self,
908 cmd: &GenerateOrderStatusReport,
909 ) -> anyhow::Result<Vec<OrderStatusReport>> {
910 tracing::warn!("generate_order_status_reports requires instrument cache integration");
918
919 if let Some(instrument_id) = cmd.instrument_id {
921 tracing::debug!("Would query orders for instrument: {}", instrument_id);
922 }
923 if let Some(client_order_id) = cmd.client_order_id {
924 tracing::debug!("Would filter by client_order_id: {}", client_order_id);
925 }
926 if let Some(venue_order_id) = cmd.venue_order_id {
927 tracing::debug!("Would filter by venue_order_id: {}", venue_order_id);
928 }
929
930 Ok(Vec::new())
931 }
932
933 async fn generate_fill_reports(
934 &self,
935 cmd: GenerateFillReports,
936 ) -> anyhow::Result<Vec<FillReport>> {
937 tracing::warn!("generate_fill_reports requires instrument cache integration");
945
946 if let Some(start) = cmd.start {
948 tracing::debug!("Would filter fills from: {}", start);
949 }
950 if let Some(end) = cmd.end {
951 tracing::debug!("Would filter fills until: {}", end);
952 }
953 if let Some(instrument_id) = cmd.instrument_id {
954 tracing::debug!("Would filter fills for instrument: {}", instrument_id);
955 }
956
957 Ok(Vec::new())
958 }
959
960 async fn generate_position_status_reports(
961 &self,
962 cmd: &GeneratePositionReports,
963 ) -> anyhow::Result<Vec<PositionStatusReport>> {
964 let user_address = self.get_user_address()?;
966
967 let _response = self
969 .http_client
970 .info_clearinghouse_state(&user_address)
971 .await
972 .context("Failed to fetch clearinghouse state")?;
973
974 tracing::warn!("Position status report parsing requires instrument cache integration");
982
983 if cmd.instrument_id.is_some() {
990 tracing::debug!(
991 "Would filter positions by instrument_id: {:?}",
992 cmd.instrument_id
993 );
994 }
995
996 Ok(Vec::new())
997 }
998
999 async fn generate_mass_status(
1000 &self,
1001 lookback_mins: Option<u64>,
1002 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1003 tracing::warn!(
1004 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1005 );
1006 Ok(None)
1012 }
1013}
1014
1015impl LiveExecutionClientExt for HyperliquidExecutionClient {
1016 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
1017 get_exec_event_sender()
1018 }
1019
1020 fn get_clock(&self) -> Ref<'_, dyn Clock> {
1021 self.core.clock().borrow()
1022 }
1023}
1024
1025impl HyperliquidExecutionClient {
1026 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1027 let mut handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1028 if handle_guard.is_some() {
1029 return Ok(());
1030 }
1031
1032 let user_address = self.get_user_address()?;
1034 let account_id = self.core.account_id;
1035 let ws_client = self.ws_client.clone();
1036
1037 let runtime = get_runtime();
1040 let instruments = runtime.block_on(async {
1041 self.http_client
1042 .request_instruments()
1043 .await
1044 .unwrap_or_default()
1045 });
1046
1047 for instrument in instruments {
1048 ws_client.add_instrument(instrument);
1049 }
1050
1051 let handle = runtime.spawn(async move {
1053 if let Err(e) = ws_client.ensure_connected().await {
1055 tracing::warn!("Failed to connect WebSocket: {}", e);
1056 return;
1057 }
1058
1059 if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1060 tracing::warn!("Failed to subscribe to order updates: {}", e);
1061 return;
1062 }
1063
1064 if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1065 tracing::warn!("Failed to subscribe to user events: {}", e);
1066 return;
1067 }
1068
1069 tracing::info!("Subscribed to Hyperliquid execution updates");
1070
1071 let clock = get_atomic_clock_realtime();
1072
1073 loop {
1075 let event = ws_client.next_event().await;
1076
1077 match event {
1078 Some(msg) => {
1079 match &msg {
1080 HyperliquidWsMsg::OrderUpdates { data } => {
1081 let mut exec_reports = Vec::new();
1082
1083 for order_update in data {
1085 if let Some(instrument) =
1086 ws_client.get_instrument_by_symbol(&order_update.order.coin)
1087 {
1088 let ts_init = clock.get_time_ns();
1089
1090 match parse_ws_order_status_report(
1091 order_update,
1092 &instrument,
1093 account_id,
1094 ts_init,
1095 ) {
1096 Ok(report) => {
1097 exec_reports.push(ExecutionReport::Order(report));
1098 }
1099 Err(e) => {
1100 tracing::warn!("Error parsing order update: {}", e);
1101 }
1102 }
1103 } else {
1104 tracing::warn!(
1105 "No instrument found for symbol: {}",
1106 order_update.order.coin
1107 );
1108 }
1109 }
1110
1111 if !exec_reports.is_empty() {
1113 for report in exec_reports {
1114 dispatch_execution_report(report);
1115 }
1116 }
1117 }
1118 HyperliquidWsMsg::UserEvents { data } => {
1119 use crate::websocket::messages::WsUserEventData;
1120
1121 let ts_init = clock.get_time_ns();
1122
1123 match data {
1124 WsUserEventData::Fills { fills } => {
1125 let mut exec_reports = Vec::new();
1126
1127 for fill in fills {
1129 if let Some(instrument) =
1130 ws_client.get_instrument_by_symbol(&fill.coin)
1131 {
1132 match parse_ws_fill_report(
1133 fill,
1134 &instrument,
1135 account_id,
1136 ts_init,
1137 ) {
1138 Ok(report) => {
1139 exec_reports
1140 .push(ExecutionReport::Fill(report));
1141 }
1142 Err(e) => {
1143 tracing::warn!("Error parsing fill: {}", e);
1144 }
1145 }
1146 } else {
1147 tracing::warn!(
1148 "No instrument found for symbol: {}",
1149 fill.coin
1150 );
1151 }
1152 }
1153
1154 if !exec_reports.is_empty() {
1156 for report in exec_reports {
1157 dispatch_execution_report(report);
1158 }
1159 }
1160 }
1161 _ => {
1162 }
1164 }
1165 }
1166 _ => {
1167 }
1169 }
1170 }
1171 None => {
1172 tracing::warn!("Hyperliquid WebSocket connection closed");
1174 break;
1175 }
1176 }
1177 }
1178 });
1179
1180 *handle_guard = Some(handle);
1181 tracing::info!("Hyperliquid WebSocket execution stream started");
1182 Ok(())
1183 }
1184}
1185
1186fn dispatch_execution_report(report: ExecutionReport) {
1187 let sender = get_exec_event_sender();
1188 match report {
1189 ExecutionReport::Order(order_report) => {
1190 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1191 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1192 tracing::warn!("Failed to send order status report: {e}");
1193 }
1194 }
1195 ExecutionReport::Fill(fill_report) => {
1196 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1197 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1198 tracing::warn!("Failed to send fill report: {e}");
1199 }
1200 }
1201 }
1202}
1203
1204pub use crate::http::models::{
1206 AssetId, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
1207 HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
1208 HyperliquidExecCancelResponseData, HyperliquidExecCancelStatus, HyperliquidExecFilledInfo,
1209 HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecModifyOrderRequest,
1210 HyperliquidExecModifyResponseData, HyperliquidExecModifyStatus, HyperliquidExecOrderKind,
1211 HyperliquidExecOrderResponseData, HyperliquidExecOrderStatus, HyperliquidExecPlaceOrderRequest,
1212 HyperliquidExecRequest, HyperliquidExecResponse, HyperliquidExecResponseData,
1213 HyperliquidExecRestingInfo, HyperliquidExecTif, HyperliquidExecTpSl,
1214 HyperliquidExecTriggerParams, HyperliquidExecTwapRequest, OrderId,
1215};