nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23    live::{runner::get_exec_event_sender, runtime::get_runtime},
24    messages::{
25        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
26        execution::{
27            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
29            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
30        },
31    },
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_model::{
36    accounts::AccountAny,
37    enums::{OmsType, OrderType},
38    identifiers::{AccountId, ClientId, Venue},
39    orders::{Order, any::OrderAny},
40    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
41    types::{AccountBalance, MarginBalance},
42};
43use serde_json;
44use tokio::task::JoinHandle;
45
46use crate::{
47    common::{
48        HyperliquidProductType,
49        consts::HYPERLIQUID_VENUE,
50        credential::Secrets,
51        parse::{
52            client_order_id_to_cancel_request, extract_error_message, is_response_successful,
53            order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
54        },
55    },
56    config::HyperliquidExecClientConfig,
57    http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
58    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
59};
60
61#[derive(Debug)]
62pub struct HyperliquidExecutionClient {
63    core: ExecutionClientCore,
64    config: HyperliquidExecClientConfig,
65    http_client: HyperliquidHttpClient,
66    ws_client: HyperliquidWebSocketClient,
67    started: bool,
68    connected: bool,
69    instruments_initialized: bool,
70    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
71    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
72}
73
74impl HyperliquidExecutionClient {
75    /// Returns a reference to the configuration.
76    pub fn config(&self) -> &HyperliquidExecClientConfig {
77        &self.config
78    }
79
80    /// Validates order before submission to catch issues early.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the order cannot be submitted to Hyperliquid.
85    ///
86    /// # Supported Order Types
87    ///
88    /// - `Market`: Standard market orders
89    /// - `Limit`: Limit orders with GTC/IOC/ALO time-in-force
90    /// - `StopMarket`: Stop loss / protective stop with market execution
91    /// - `StopLimit`: Stop loss / protective stop with limit price
92    /// - `MarketIfTouched`: Profit taking / entry order with market execution
93    /// - `LimitIfTouched`: Profit taking / entry order with limit price
94    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
95        // Check if instrument symbol is supported
96        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
97        let instrument_id = order.instrument_id();
98        let symbol = instrument_id.symbol.as_str();
99        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
100            anyhow::bail!(
101                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
102            );
103        }
104
105        // Check if order type is supported
106        match order.order_type() {
107            OrderType::Market
108            | OrderType::Limit
109            | OrderType::StopMarket
110            | OrderType::StopLimit
111            | OrderType::MarketIfTouched
112            | OrderType::LimitIfTouched => {}
113            _ => anyhow::bail!(
114                "Unsupported order type for Hyperliquid: {:?}",
115                order.order_type()
116            ),
117        }
118
119        // Check if conditional orders have trigger price
120        if matches!(
121            order.order_type(),
122            OrderType::StopMarket
123                | OrderType::StopLimit
124                | OrderType::MarketIfTouched
125                | OrderType::LimitIfTouched
126        ) && order.trigger_price().is_none()
127        {
128            anyhow::bail!(
129                "Conditional orders require a trigger price for Hyperliquid: {:?}",
130                order.order_type()
131            );
132        }
133
134        // Check if limit-based orders have price
135        if matches!(
136            order.order_type(),
137            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138        ) && order.price().is_none()
139        {
140            anyhow::bail!(
141                "Limit orders require a limit price for Hyperliquid: {:?}",
142                order.order_type()
143            );
144        }
145
146        Ok(())
147    }
148
149    /// Creates a new [`HyperliquidExecutionClient`].
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if either the HTTP or WebSocket client fail to construct.
154    pub fn new(
155        core: ExecutionClientCore,
156        config: HyperliquidExecClientConfig,
157    ) -> anyhow::Result<Self> {
158        if !config.has_credentials() {
159            anyhow::bail!("Hyperliquid execution client requires private key");
160        }
161
162        let secrets = Secrets::from_json(&format!(
163            r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164            config.private_key, config.is_testnet
165        ))
166        .context("failed to create secrets from private key")?;
167
168        let http_client = HyperliquidHttpClient::with_credentials(
169            &secrets,
170            Some(config.http_timeout_secs),
171            config.http_proxy_url.clone(),
172        )
173        .context("failed to create Hyperliquid HTTP client")?;
174
175        // Create WebSocket client (will connect when needed)
176        // Note: For execution WebSocket (private account messages), product type is less critical
177        // since messages are account-scoped. Defaulting to Perp.
178        let ws_client = HyperliquidWebSocketClient::new(
179            None,
180            config.is_testnet,
181            HyperliquidProductType::Perp,
182            Some(core.account_id),
183        );
184
185        Ok(Self {
186            core,
187            config,
188            http_client,
189            ws_client,
190            started: false,
191            connected: false,
192            instruments_initialized: false,
193            pending_tasks: Mutex::new(Vec::new()),
194            ws_stream_handle: Mutex::new(None),
195        })
196    }
197
198    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
199        if self.instruments_initialized {
200            return Ok(());
201        }
202
203        let instruments = self
204            .http_client
205            .request_instruments()
206            .await
207            .context("failed to request Hyperliquid instruments")?;
208
209        if instruments.is_empty() {
210            tracing::warn!(
211                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
212            );
213        } else {
214            tracing::info!("Initialized {} instruments", instruments.len());
215
216            for instrument in &instruments {
217                self.http_client.cache_instrument(instrument.clone());
218            }
219        }
220
221        self.instruments_initialized = true;
222        Ok(())
223    }
224
225    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
226        if self.instruments_initialized {
227            return Ok(());
228        }
229
230        let runtime = get_runtime();
231        runtime.block_on(self.ensure_instruments_initialized_async())
232    }
233
234    async fn refresh_account_state(&self) -> anyhow::Result<()> {
235        // Get account information from Hyperliquid using the user address
236        // We need to derive the user address from the private key in the config
237        let user_address = self.get_user_address()?;
238
239        // Use vault address if configured, otherwise use user address
240        let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
241
242        // Query clearinghouseState endpoint to get balances and margin info
243        let clearinghouse_state = self
244            .http_client
245            .info_clearinghouse_state(account_address)
246            .await
247            .context("failed to fetch clearinghouse state")?;
248
249        // Deserialize the response
250        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
251            .context("failed to deserialize clearinghouse state")?;
252
253        tracing::debug!(
254            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
255            state.cross_margin_summary,
256            state.asset_positions.len()
257        );
258
259        // Parse balances and margins from cross margin summary
260        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
261            let (balances, margins) =
262                crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
263                    .context("failed to parse account balances and margins")?;
264
265            let ts_event = if let Some(time_ms) = state.time {
266                nautilus_core::UnixNanos::from(time_ms * 1_000_000)
267            } else {
268                nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
269            };
270
271            // Generate account state event
272            self.core.generate_account_state(
273                balances, margins, true, // reported
274                ts_event,
275            )?;
276
277            tracing::info!("Account state updated successfully");
278        } else {
279            tracing::warn!("No cross margin summary in clearinghouse state");
280        }
281
282        Ok(())
283    }
284
285    fn get_user_address(&self) -> anyhow::Result<String> {
286        let address = self
287            .http_client
288            .get_user_address()
289            .context("failed to get user address from HTTP client")?;
290
291        Ok(address)
292    }
293
294    fn spawn_task<F>(&self, description: &'static str, fut: F)
295    where
296        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
297    {
298        let runtime = get_runtime();
299        let handle = runtime.spawn(async move {
300            if let Err(e) = fut.await {
301                tracing::warn!("{description} failed: {e:?}");
302            }
303        });
304
305        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306        tasks.retain(|handle| !handle.is_finished());
307        tasks.push(handle);
308    }
309
310    fn abort_pending_tasks(&self) {
311        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
312        for handle in tasks.drain(..) {
313            handle.abort();
314        }
315    }
316
317    fn update_account_state(&self) -> anyhow::Result<()> {
318        let runtime = get_runtime();
319        runtime.block_on(self.refresh_account_state())
320    }
321}
322
323#[async_trait(?Send)]
324impl ExecutionClient for HyperliquidExecutionClient {
325    fn is_connected(&self) -> bool {
326        self.connected
327    }
328
329    fn client_id(&self) -> ClientId {
330        self.core.client_id
331    }
332
333    fn account_id(&self) -> AccountId {
334        self.core.account_id
335    }
336
337    fn venue(&self) -> Venue {
338        *HYPERLIQUID_VENUE
339    }
340
341    fn oms_type(&self) -> OmsType {
342        self.core.oms_type
343    }
344
345    fn get_account(&self) -> Option<AccountAny> {
346        self.core.get_account()
347    }
348
349    fn generate_account_state(
350        &self,
351        balances: Vec<AccountBalance>,
352        margins: Vec<MarginBalance>,
353        reported: bool,
354        ts_event: UnixNanos,
355    ) -> anyhow::Result<()> {
356        self.core
357            .generate_account_state(balances, margins, reported, ts_event)
358    }
359
360    fn start(&mut self) -> anyhow::Result<()> {
361        if self.started {
362            return Ok(());
363        }
364
365        tracing::info!(
366            client_id = %self.core.client_id,
367            account_id = %self.core.account_id,
368            is_testnet = self.config.is_testnet,
369            vault_address = ?self.config.vault_address,
370            http_proxy_url = ?self.config.http_proxy_url,
371            ws_proxy_url = ?self.config.ws_proxy_url,
372            "Starting Hyperliquid execution client"
373        );
374
375        // Ensure instruments are initialized
376        self.ensure_instruments_initialized()?;
377
378        // Initialize account state
379        if let Err(e) = self.update_account_state() {
380            tracing::warn!("Failed to initialize account state: {e}");
381        }
382
383        self.connected = true;
384        self.started = true;
385
386        // Start WebSocket stream for execution updates
387        if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
388            tracing::warn!("Failed to start WebSocket stream: {e}");
389        }
390
391        tracing::info!("Hyperliquid execution client started");
392        Ok(())
393    }
394    fn stop(&mut self) -> anyhow::Result<()> {
395        if !self.started {
396            return Ok(());
397        }
398
399        tracing::info!("Stopping Hyperliquid execution client");
400
401        // Stop WebSocket stream
402        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
403            handle.abort();
404        }
405
406        // Abort any pending tasks
407        self.abort_pending_tasks();
408
409        // Disconnect WebSocket
410        if self.connected {
411            let runtime = get_runtime();
412            runtime.block_on(async {
413                if let Err(e) = self.ws_client.disconnect().await {
414                    tracing::warn!("Error disconnecting WebSocket client: {e}");
415                }
416            });
417        }
418
419        self.connected = false;
420        self.started = false;
421
422        tracing::info!("Hyperliquid execution client stopped");
423        Ok(())
424    }
425
426    fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
427        let order = &command.order;
428
429        if order.is_closed() {
430            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
431            return Ok(());
432        }
433
434        if let Err(e) = self.validate_order_submission(order) {
435            self.core.generate_order_rejected(
436                order.strategy_id(),
437                order.instrument_id(),
438                order.client_order_id(),
439                &format!("validation-error: {e}"),
440                command.ts_init,
441                false,
442            );
443            return Err(e);
444        }
445
446        self.core.generate_order_submitted(
447            order.strategy_id(),
448            order.instrument_id(),
449            order.client_order_id(),
450            command.ts_init,
451        );
452
453        let http_client = self.http_client.clone();
454        let order_clone = order.clone();
455
456        self.spawn_task("submit_order", async move {
457            match order_any_to_hyperliquid_request(&order_clone) {
458                Ok(hyperliquid_order) => {
459                    // Create exchange action for order placement with typed struct
460                    let action = ExchangeAction::order(vec![hyperliquid_order]);
461
462                    match http_client.post_action(&action).await {
463                        Ok(response) => {
464                            if is_response_successful(&response) {
465                                tracing::info!("Order submitted successfully: {:?}", response);
466                                // Order acceptance/rejection events will be generated from WebSocket updates
467                                // which provide the venue_order_id and definitive status
468                            } else {
469                                let error_msg = extract_error_message(&response);
470                                tracing::warn!(
471                                    "Order submission rejected by exchange: {}",
472                                    error_msg
473                                );
474                                // Order rejection event will be generated from WebSocket updates
475                            }
476                        }
477                        Err(e) => {
478                            tracing::warn!("Order submission HTTP request failed: {e}");
479                            // WebSocket reconnection and order reconciliation will handle recovery
480                        }
481                    }
482                }
483                Err(e) => {
484                    tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
485                    // This indicates a client-side bug or unsupported order configuration
486                }
487            }
488
489            Ok(())
490        });
491
492        Ok(())
493    }
494
495    fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
496        tracing::debug!(
497            "Submitting order list with {} orders",
498            command.order_list.orders.len()
499        );
500
501        let http_client = self.http_client.clone();
502        let orders: Vec<OrderAny> = command.order_list.orders.clone();
503
504        // Generate submitted events for all orders
505        for order in &orders {
506            self.core.generate_order_submitted(
507                order.strategy_id(),
508                order.instrument_id(),
509                order.client_order_id(),
510                command.ts_init,
511            );
512        }
513
514        self.spawn_task("submit_order_list", async move {
515            // Convert all orders to Hyperliquid format
516            let order_refs: Vec<&OrderAny> = orders.iter().collect();
517            match orders_to_hyperliquid_requests(&order_refs) {
518                Ok(hyperliquid_orders) => {
519                    // Create exchange action for order placement with typed struct
520                    let action = ExchangeAction::order(hyperliquid_orders);
521                    match http_client.post_action(&action).await {
522                        Ok(response) => {
523                            if is_response_successful(&response) {
524                                tracing::info!("Order list submitted successfully: {:?}", response);
525                                // Order acceptance/rejection events will be generated from WebSocket updates
526                            } else {
527                                let error_msg = extract_error_message(&response);
528                                tracing::warn!(
529                                    "Order list submission rejected by exchange: {}",
530                                    error_msg
531                                );
532                                // Individual order rejection events will be generated from WebSocket updates
533                            }
534                        }
535                        Err(e) => {
536                            tracing::warn!("Order list submission HTTP request failed: {e}");
537                            // WebSocket reconciliation will handle recovery
538                        }
539                    }
540                }
541                Err(e) => {
542                    tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
543                }
544            }
545
546            Ok(())
547        });
548
549        Ok(())
550    }
551
552    fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
553        tracing::debug!("Modifying order: {:?}", command);
554
555        // Parse venue_order_id as u64
556        let venue_order_id = match command.venue_order_id {
557            Some(id) => id,
558            None => {
559                tracing::warn!("Cannot modify order: venue_order_id is None");
560                return Ok(());
561            }
562        };
563
564        let oid: u64 = match venue_order_id.as_str().parse() {
565            Ok(id) => id,
566            Err(e) => {
567                tracing::warn!(
568                    "Failed to parse venue_order_id '{}' as u64: {}",
569                    venue_order_id,
570                    e
571                );
572                return Ok(());
573            }
574        };
575
576        let http_client = self.http_client.clone();
577        let price = command.price;
578        let quantity = command.quantity;
579        let symbol = command.instrument_id.symbol.inner();
580
581        self.spawn_task("modify_order", async move {
582            use crate::{
583                common::parse::extract_asset_id_from_symbol,
584                http::models::HyperliquidExecModifyOrderRequest,
585            };
586
587            // Extract asset ID from instrument symbol
588            let asset = match extract_asset_id_from_symbol(&symbol) {
589                Ok(asset) => asset,
590                Err(e) => {
591                    tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
592                    return Ok(());
593                }
594            };
595
596            // Build typed modify request with new price and/or quantity
597            let modify_request = HyperliquidExecModifyOrderRequest {
598                asset,
599                oid,
600                price: price.map(|p| (*p).into()),
601                size: quantity.map(|q| (*q).into()),
602                reduce_only: None,
603                kind: None,
604            };
605
606            let action = ExchangeAction::modify(oid, modify_request);
607
608            match http_client.post_action(&action).await {
609                Ok(response) => {
610                    if is_response_successful(&response) {
611                        tracing::info!("Order modified successfully: {:?}", response);
612                        // Order update events will be generated from WebSocket updates
613                    } else {
614                        let error_msg = extract_error_message(&response);
615                        tracing::warn!("Order modification rejected by exchange: {}", error_msg);
616                        // Order modify rejected events will be generated from WebSocket updates
617                    }
618                }
619                Err(e) => {
620                    tracing::warn!("Order modification HTTP request failed: {e}");
621                    // WebSocket reconciliation will handle recovery
622                }
623            }
624
625            Ok(())
626        });
627
628        Ok(())
629    }
630
631    fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
632        tracing::debug!("Cancelling order: {:?}", command);
633
634        let http_client = self.http_client.clone();
635        let client_order_id = command.client_order_id.inner();
636        let symbol = command.instrument_id.symbol.inner();
637
638        self.spawn_task("cancel_order", async move {
639            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
640                Ok(cancel_request) => {
641                    // Create exchange action for order cancellation with typed struct
642                    let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
643                    match http_client.post_action(&action).await {
644                        Ok(response) => {
645                            if is_response_successful(&response) {
646                                tracing::info!("Order cancelled successfully: {:?}", response);
647                                // Order cancelled events will be generated from WebSocket updates
648                                // which provide definitive confirmation and venue_order_id
649                            } else {
650                                let error_msg = extract_error_message(&response);
651                                tracing::warn!(
652                                    "Order cancellation rejected by exchange: {}",
653                                    error_msg
654                                );
655                                // Order cancel rejected events will be generated from WebSocket updates
656                            }
657                        }
658                        Err(e) => {
659                            tracing::warn!("Order cancellation HTTP request failed: {e}");
660                            // WebSocket reconnection and reconciliation will handle recovery
661                        }
662                    }
663                }
664                Err(e) => {
665                    tracing::warn!(
666                        "Failed to convert order to Hyperliquid cancel format: {:?}",
667                        e
668                    );
669                }
670            }
671
672            Ok(())
673        });
674
675        Ok(())
676    }
677
678    fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
679        tracing::debug!("Cancelling all orders: {:?}", command);
680
681        // Query cache for all open orders matching the instrument and side
682        let cache = self.core.cache().borrow();
683        let open_orders = cache.orders_open(
684            Some(&self.core.venue),
685            Some(&command.instrument_id),
686            None,
687            Some(command.order_side),
688        );
689
690        if open_orders.is_empty() {
691            tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
692            return Ok(());
693        }
694
695        // Convert orders to cancel requests
696        let mut cancel_requests = Vec::new();
697        let symbol = command.instrument_id.symbol.inner();
698        for order in open_orders {
699            let client_order_id = order.client_order_id().inner();
700
701            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
702                Ok(req) => cancel_requests.push(req),
703                Err(e) => {
704                    tracing::warn!(
705                        "Failed to convert order {} to cancel request: {}",
706                        client_order_id,
707                        e
708                    );
709                    continue;
710                }
711            }
712        }
713
714        if cancel_requests.is_empty() {
715            tracing::debug!("No valid cancel requests to send");
716            return Ok(());
717        }
718
719        // Create exchange action for cancellation with typed struct
720        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
721
722        // Send cancel request via HTTP API
723        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
724        let http_client = self.http_client.clone();
725        let runtime = get_runtime();
726        runtime.spawn(async move {
727            if let Err(e) = http_client.post_action(&action).await {
728                tracing::warn!("Failed to send cancel all orders request: {e}");
729            }
730        });
731
732        Ok(())
733    }
734
735    fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
736        tracing::debug!("Batch cancelling orders: {:?}", command);
737
738        if command.cancels.is_empty() {
739            tracing::debug!("No orders to cancel in batch");
740            return Ok(());
741        }
742
743        // Convert each CancelOrder to a cancel request
744        let mut cancel_requests = Vec::new();
745        for cancel_cmd in &command.cancels {
746            let client_order_id = cancel_cmd.client_order_id.inner();
747            let symbol = cancel_cmd.instrument_id.symbol.inner();
748
749            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
750                Ok(req) => cancel_requests.push(req),
751                Err(e) => {
752                    tracing::warn!(
753                        "Failed to convert order {} to cancel request: {}",
754                        client_order_id,
755                        e
756                    );
757                    continue;
758                }
759            }
760        }
761
762        if cancel_requests.is_empty() {
763            tracing::warn!("No valid cancel requests in batch");
764            return Ok(());
765        }
766
767        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
768
769        // Send batch cancel request via HTTP API
770        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
771        let http_client = self.http_client.clone();
772        let runtime = get_runtime();
773        runtime.spawn(async move {
774            if let Err(e) = http_client.post_action(&action).await {
775                tracing::warn!("Failed to send batch cancel orders request: {e}");
776            }
777        });
778
779        Ok(())
780    }
781
782    fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
783        tracing::debug!("Querying account: {:?}", command);
784
785        // Use existing infrastructure to refresh account state
786        let runtime = get_runtime();
787        runtime.block_on(async {
788            if let Err(e) = self.refresh_account_state().await {
789                tracing::warn!("Failed to query account state: {e}");
790            }
791        });
792
793        Ok(())
794    }
795
796    fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
797        tracing::debug!("Querying order: {:?}", command);
798
799        // Get venue order ID from cache
800        let cache = self.core.cache().borrow();
801        let venue_order_id = cache.venue_order_id(&command.client_order_id);
802
803        let venue_order_id = match venue_order_id {
804            Some(oid) => *oid,
805            None => {
806                tracing::warn!(
807                    "No venue order ID found for client order {}",
808                    command.client_order_id
809                );
810                return Ok(());
811            }
812        };
813        drop(cache);
814
815        // Parse venue order ID to u64
816        let oid = match u64::from_str(venue_order_id.as_ref()) {
817            Ok(id) => id,
818            Err(e) => {
819                tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
820                return Ok(());
821            }
822        };
823
824        // Get user address for the query
825        let user_address = self.get_user_address()?;
826
827        // Query order status via HTTP API
828        // Note: The WebSocket connection is the authoritative source for order updates,
829        // this is primarily for reconciliation or when WebSocket is unavailable
830        let http_client = self.http_client.clone();
831        let runtime = get_runtime();
832        runtime.spawn(async move {
833            match http_client.info_order_status(&user_address, oid).await {
834                Ok(status) => {
835                    tracing::debug!("Order status for oid {}: {:?}", oid, status);
836                }
837                Err(e) => {
838                    tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
839                }
840            }
841        });
842
843        Ok(())
844    }
845
846    async fn connect(&mut self) -> anyhow::Result<()> {
847        if self.connected {
848            return Ok(());
849        }
850
851        tracing::info!("Connecting Hyperliquid execution client");
852
853        // Ensure instruments are initialized
854        self.ensure_instruments_initialized_async().await?;
855
856        // Connect WebSocket client
857        self.ws_client.connect().await?;
858
859        // Subscribe to user-specific order updates and fills
860        let user_address = self.get_user_address()?;
861        self.ws_client
862            .subscribe_all_user_channels(&user_address)
863            .await?;
864
865        // Initialize account state
866        self.refresh_account_state().await?;
867
868        self.connected = true;
869        self.core.set_connected(true);
870
871        // Start WebSocket stream for execution updates
872        if let Err(e) = self.start_ws_stream().await {
873            tracing::warn!("Failed to start WebSocket stream: {e}");
874        }
875
876        tracing::info!(client_id = %self.core.client_id, "Connected");
877        Ok(())
878    }
879
880    async fn disconnect(&mut self) -> anyhow::Result<()> {
881        if !self.connected {
882            return Ok(());
883        }
884
885        tracing::info!("Disconnecting Hyperliquid execution client");
886
887        // Disconnect WebSocket
888        self.ws_client.disconnect().await?;
889
890        // Abort any pending tasks
891        self.abort_pending_tasks();
892
893        self.connected = false;
894        self.core.set_connected(false);
895
896        tracing::info!(client_id = %self.core.client_id, "Disconnected");
897        Ok(())
898    }
899
900    async fn generate_order_status_report(
901        &self,
902        _cmd: &GenerateOrderStatusReport,
903    ) -> anyhow::Result<Option<OrderStatusReport>> {
904        // NOTE: Single order status report generation requires instrument cache integration.
905        // The HTTP client methods and parsing functions are implemented and ready to use.
906        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
907        tracing::warn!("generate_order_status_report not yet fully implemented");
908        Ok(None)
909    }
910
911    async fn generate_order_status_reports(
912        &self,
913        cmd: &GenerateOrderStatusReports,
914    ) -> anyhow::Result<Vec<OrderStatusReport>> {
915        let user_address = self.get_user_address()?;
916
917        let reports = self
918            .http_client
919            .request_order_status_reports(&user_address, cmd.instrument_id)
920            .await
921            .context("failed to generate order status reports")?;
922
923        // Filter by open_only if specified
924        let reports = if cmd.open_only {
925            reports
926                .into_iter()
927                .filter(|r| r.order_status.is_open())
928                .collect()
929        } else {
930            reports
931        };
932
933        // Filter by time range if specified
934        let reports = match (cmd.start, cmd.end) {
935            (Some(start), Some(end)) => reports
936                .into_iter()
937                .filter(|r| r.ts_last >= start && r.ts_last <= end)
938                .collect(),
939            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
940            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
941            (None, None) => reports,
942        };
943
944        tracing::info!("Generated {} order status reports", reports.len());
945        Ok(reports)
946    }
947
948    async fn generate_fill_reports(
949        &self,
950        cmd: GenerateFillReports,
951    ) -> anyhow::Result<Vec<FillReport>> {
952        let user_address = self.get_user_address()?;
953
954        let reports = self
955            .http_client
956            .request_fill_reports(&user_address, cmd.instrument_id)
957            .await
958            .context("failed to generate fill reports")?;
959
960        // Filter by time range if specified
961        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
962            reports
963                .into_iter()
964                .filter(|r| r.ts_event >= start && r.ts_event <= end)
965                .collect()
966        } else if let Some(start) = cmd.start {
967            reports
968                .into_iter()
969                .filter(|r| r.ts_event >= start)
970                .collect()
971        } else if let Some(end) = cmd.end {
972            reports.into_iter().filter(|r| r.ts_event <= end).collect()
973        } else {
974            reports
975        };
976
977        tracing::info!("Generated {} fill reports", reports.len());
978        Ok(reports)
979    }
980
981    async fn generate_position_status_reports(
982        &self,
983        cmd: &GeneratePositionStatusReports,
984    ) -> anyhow::Result<Vec<PositionStatusReport>> {
985        let user_address = self.get_user_address()?;
986
987        let reports = self
988            .http_client
989            .request_position_status_reports(&user_address, cmd.instrument_id)
990            .await
991            .context("failed to generate position status reports")?;
992
993        tracing::info!("Generated {} position status reports", reports.len());
994        Ok(reports)
995    }
996
997    async fn generate_mass_status(
998        &self,
999        lookback_mins: Option<u64>,
1000    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1001        tracing::warn!(
1002            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1003        );
1004        // Full implementation would require:
1005        // 1. Query all orders within lookback window
1006        // 2. Query all fills within lookback window
1007        // 3. Query all positions
1008        // 4. Combine into ExecutionMassStatus
1009        Ok(None)
1010    }
1011}
1012
1013impl HyperliquidExecutionClient {
1014    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1015        {
1016            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1017            if handle_guard.is_some() {
1018                return Ok(());
1019            }
1020        }
1021
1022        let user_address = self.get_user_address()?;
1023        let _account_id = self.core.account_id;
1024        let mut ws_client = self.ws_client.clone();
1025
1026        let instruments = self
1027            .http_client
1028            .request_instruments()
1029            .await
1030            .unwrap_or_default();
1031
1032        for instrument in instruments {
1033            ws_client.cache_instrument(instrument);
1034        }
1035
1036        let runtime = get_runtime();
1037        let handle = runtime.spawn(async move {
1038            if let Err(e) = ws_client.connect().await {
1039                tracing::warn!("Failed to connect WebSocket: {e}");
1040                return;
1041            }
1042
1043            if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1044                tracing::warn!("Failed to subscribe to order updates: {e}");
1045                return;
1046            }
1047
1048            if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1049                tracing::warn!("Failed to subscribe to user events: {e}");
1050                return;
1051            }
1052
1053            tracing::info!("Subscribed to Hyperliquid execution updates");
1054
1055            let _clock = get_atomic_clock_realtime();
1056
1057            loop {
1058                let event = ws_client.next_event().await;
1059
1060                match event {
1061                    Some(msg) => {
1062                        match msg {
1063                            NautilusWsMessage::ExecutionReports(reports) => {
1064                                // Handler already parsed the messages, just dispatch them
1065                                for report in reports {
1066                                    dispatch_execution_report(report);
1067                                }
1068                            }
1069                            NautilusWsMessage::Reconnected => {
1070                                tracing::info!("WebSocket reconnected");
1071                                // TODO: Resubscribe to user channels if needed
1072                            }
1073                            NautilusWsMessage::Error(e) => {
1074                                tracing::error!("WebSocket error: {e}");
1075                            }
1076                            // Handled by data client
1077                            NautilusWsMessage::Trades(_)
1078                            | NautilusWsMessage::Quote(_)
1079                            | NautilusWsMessage::Deltas(_)
1080                            | NautilusWsMessage::Candle(_)
1081                            | NautilusWsMessage::MarkPrice(_)
1082                            | NautilusWsMessage::IndexPrice(_)
1083                            | NautilusWsMessage::FundingRate(_) => {}
1084                        }
1085                    }
1086                    None => {
1087                        tracing::warn!("WebSocket next_event returned None");
1088                        break;
1089                    }
1090                }
1091            }
1092        });
1093
1094        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1095        tracing::info!("Hyperliquid WebSocket execution stream started");
1096        Ok(())
1097    }
1098}
1099
1100fn dispatch_execution_report(report: ExecutionReport) {
1101    let sender = get_exec_event_sender();
1102    match report {
1103        ExecutionReport::Order(order_report) => {
1104            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1105            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1106                tracing::warn!("Failed to send order status report: {e}");
1107            }
1108        }
1109        ExecutionReport::Fill(fill_report) => {
1110            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1111            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1112                tracing::warn!("Failed to send fill report: {e}");
1113            }
1114        }
1115    }
1116}