nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23    live::{runner::get_exec_event_sender, runtime::get_runtime},
24    messages::{
25        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
26        execution::{
27            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
29            QueryOrder, SubmitOrder, SubmitOrderList,
30        },
31    },
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_live::execution::client::LiveExecutionClient;
36use nautilus_model::{
37    accounts::AccountAny,
38    enums::{OmsType, OrderType},
39    identifiers::{AccountId, ClientId, Venue},
40    orders::{Order, any::OrderAny},
41    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
42    types::{AccountBalance, MarginBalance},
43};
44use serde_json;
45use tokio::task::JoinHandle;
46
47use crate::{
48    common::{
49        HyperliquidProductType,
50        consts::HYPERLIQUID_VENUE,
51        credential::Secrets,
52        parse::{
53            client_order_id_to_cancel_request, extract_error_message, is_response_successful,
54            order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
55        },
56    },
57    config::HyperliquidExecClientConfig,
58    http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
59    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
60};
61
62#[derive(Debug)]
63pub struct HyperliquidExecutionClient {
64    core: ExecutionClientCore,
65    config: HyperliquidExecClientConfig,
66    http_client: HyperliquidHttpClient,
67    ws_client: HyperliquidWebSocketClient,
68    started: bool,
69    connected: bool,
70    instruments_initialized: bool,
71    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
72    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
73}
74
75impl HyperliquidExecutionClient {
76    /// Returns a reference to the configuration.
77    pub fn config(&self) -> &HyperliquidExecClientConfig {
78        &self.config
79    }
80
81    /// Validates order before submission to catch issues early.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the order cannot be submitted to Hyperliquid.
86    ///
87    /// # Supported Order Types
88    ///
89    /// - `Market`: Standard market orders
90    /// - `Limit`: Limit orders with GTC/IOC/ALO time-in-force
91    /// - `StopMarket`: Stop loss / protective stop with market execution
92    /// - `StopLimit`: Stop loss / protective stop with limit price
93    /// - `MarketIfTouched`: Profit taking / entry order with market execution
94    /// - `LimitIfTouched`: Profit taking / entry order with limit price
95    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
96        // Check if instrument symbol is supported
97        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
98        let instrument_id = order.instrument_id();
99        let symbol = instrument_id.symbol.as_str();
100        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
101            anyhow::bail!(
102                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
103            );
104        }
105
106        // Check if order type is supported
107        match order.order_type() {
108            OrderType::Market
109            | OrderType::Limit
110            | OrderType::StopMarket
111            | OrderType::StopLimit
112            | OrderType::MarketIfTouched
113            | OrderType::LimitIfTouched => {}
114            _ => anyhow::bail!(
115                "Unsupported order type for Hyperliquid: {:?}",
116                order.order_type()
117            ),
118        }
119
120        // Check if conditional orders have trigger price
121        if matches!(
122            order.order_type(),
123            OrderType::StopMarket
124                | OrderType::StopLimit
125                | OrderType::MarketIfTouched
126                | OrderType::LimitIfTouched
127        ) && order.trigger_price().is_none()
128        {
129            anyhow::bail!(
130                "Conditional orders require a trigger price for Hyperliquid: {:?}",
131                order.order_type()
132            );
133        }
134
135        // Check if limit-based orders have price
136        if matches!(
137            order.order_type(),
138            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
139        ) && order.price().is_none()
140        {
141            anyhow::bail!(
142                "Limit orders require a limit price for Hyperliquid: {:?}",
143                order.order_type()
144            );
145        }
146
147        Ok(())
148    }
149
150    /// Creates a new [`HyperliquidExecutionClient`].
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if either the HTTP or WebSocket client fail to construct.
155    pub fn new(
156        core: ExecutionClientCore,
157        config: HyperliquidExecClientConfig,
158    ) -> anyhow::Result<Self> {
159        if !config.has_credentials() {
160            anyhow::bail!("Hyperliquid execution client requires private key");
161        }
162
163        let secrets = Secrets::from_json(&format!(
164            r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
165            config.private_key, config.is_testnet
166        ))
167        .context("failed to create secrets from private key")?;
168
169        let http_client = HyperliquidHttpClient::with_credentials(
170            &secrets,
171            Some(config.http_timeout_secs),
172            config.http_proxy_url.clone(),
173        )
174        .context("failed to create Hyperliquid HTTP client")?;
175
176        // Create WebSocket client (will connect when needed)
177        // Note: For execution WebSocket (private account messages), product type is less critical
178        // since messages are account-scoped. Defaulting to Perp.
179        let ws_client = HyperliquidWebSocketClient::new(
180            None,
181            config.is_testnet,
182            HyperliquidProductType::Perp,
183            Some(core.account_id),
184        );
185
186        Ok(Self {
187            core,
188            config,
189            http_client,
190            ws_client,
191            started: false,
192            connected: false,
193            instruments_initialized: false,
194            pending_tasks: Mutex::new(Vec::new()),
195            ws_stream_handle: Mutex::new(None),
196        })
197    }
198
199    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
200        if self.instruments_initialized {
201            return Ok(());
202        }
203
204        let instruments = self
205            .http_client
206            .request_instruments()
207            .await
208            .context("failed to request Hyperliquid instruments")?;
209
210        if instruments.is_empty() {
211            tracing::warn!(
212                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
213            );
214        } else {
215            tracing::info!("Initialized {} instruments", instruments.len());
216
217            for instrument in &instruments {
218                self.http_client.cache_instrument(instrument.clone());
219            }
220        }
221
222        self.instruments_initialized = true;
223        Ok(())
224    }
225
226    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
227        if self.instruments_initialized {
228            return Ok(());
229        }
230
231        let runtime = get_runtime();
232        runtime.block_on(self.ensure_instruments_initialized_async())
233    }
234
235    async fn refresh_account_state(&self) -> anyhow::Result<()> {
236        // Get account information from Hyperliquid using the user address
237        // We need to derive the user address from the private key in the config
238        let user_address = self.get_user_address()?;
239
240        // Use vault address if configured, otherwise use user address
241        let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
242
243        // Query clearinghouseState endpoint to get balances and margin info
244        let clearinghouse_state = self
245            .http_client
246            .info_clearinghouse_state(account_address)
247            .await
248            .context("failed to fetch clearinghouse state")?;
249
250        // Deserialize the response
251        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
252            .context("failed to deserialize clearinghouse state")?;
253
254        tracing::debug!(
255            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
256            state.cross_margin_summary,
257            state.asset_positions.len()
258        );
259
260        // Parse balances and margins from cross margin summary
261        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
262            let (balances, margins) =
263                crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
264                    .context("failed to parse account balances and margins")?;
265
266            let ts_event = if let Some(time_ms) = state.time {
267                nautilus_core::UnixNanos::from(time_ms * 1_000_000)
268            } else {
269                nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
270            };
271
272            // Generate account state event
273            self.core.generate_account_state(
274                balances, margins, true, // reported
275                ts_event,
276            )?;
277
278            tracing::info!("Account state updated successfully");
279        } else {
280            tracing::warn!("No cross margin summary in clearinghouse state");
281        }
282
283        Ok(())
284    }
285
286    fn get_user_address(&self) -> anyhow::Result<String> {
287        let address = self
288            .http_client
289            .get_user_address()
290            .context("failed to get user address from HTTP client")?;
291
292        Ok(address)
293    }
294
295    fn spawn_task<F>(&self, description: &'static str, fut: F)
296    where
297        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
298    {
299        let runtime = get_runtime();
300        let handle = runtime.spawn(async move {
301            if let Err(e) = fut.await {
302                tracing::warn!("{description} failed: {e:?}");
303            }
304        });
305
306        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
307        tasks.retain(|handle| !handle.is_finished());
308        tasks.push(handle);
309    }
310
311    fn abort_pending_tasks(&self) {
312        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
313        for handle in tasks.drain(..) {
314            handle.abort();
315        }
316    }
317
318    fn update_account_state(&self) -> anyhow::Result<()> {
319        let runtime = get_runtime();
320        runtime.block_on(self.refresh_account_state())
321    }
322}
323
324#[async_trait(?Send)]
325impl ExecutionClient for HyperliquidExecutionClient {
326    fn is_connected(&self) -> bool {
327        self.connected
328    }
329
330    fn client_id(&self) -> ClientId {
331        self.core.client_id
332    }
333
334    fn account_id(&self) -> AccountId {
335        self.core.account_id
336    }
337
338    fn venue(&self) -> Venue {
339        *HYPERLIQUID_VENUE
340    }
341
342    fn oms_type(&self) -> OmsType {
343        self.core.oms_type
344    }
345
346    fn get_account(&self) -> Option<AccountAny> {
347        self.core.get_account()
348    }
349
350    fn generate_account_state(
351        &self,
352        balances: Vec<AccountBalance>,
353        margins: Vec<MarginBalance>,
354        reported: bool,
355        ts_event: UnixNanos,
356    ) -> anyhow::Result<()> {
357        self.core
358            .generate_account_state(balances, margins, reported, ts_event)
359    }
360
361    fn start(&mut self) -> anyhow::Result<()> {
362        if self.started {
363            return Ok(());
364        }
365
366        tracing::info!(
367            client_id = %self.core.client_id,
368            account_id = %self.core.account_id,
369            is_testnet = self.config.is_testnet,
370            vault_address = ?self.config.vault_address,
371            http_proxy_url = ?self.config.http_proxy_url,
372            ws_proxy_url = ?self.config.ws_proxy_url,
373            "Starting Hyperliquid execution client"
374        );
375
376        // Ensure instruments are initialized
377        self.ensure_instruments_initialized()?;
378
379        // Initialize account state
380        if let Err(e) = self.update_account_state() {
381            tracing::warn!("Failed to initialize account state: {e}");
382        }
383
384        self.connected = true;
385        self.started = true;
386
387        // Start WebSocket stream for execution updates
388        if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
389            tracing::warn!("Failed to start WebSocket stream: {e}");
390        }
391
392        tracing::info!("Hyperliquid execution client started");
393        Ok(())
394    }
395    fn stop(&mut self) -> anyhow::Result<()> {
396        if !self.started {
397            return Ok(());
398        }
399
400        tracing::info!("Stopping Hyperliquid execution client");
401
402        // Stop WebSocket stream
403        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
404            handle.abort();
405        }
406
407        // Abort any pending tasks
408        self.abort_pending_tasks();
409
410        // Disconnect WebSocket
411        if self.connected {
412            let runtime = get_runtime();
413            runtime.block_on(async {
414                if let Err(e) = self.ws_client.disconnect().await {
415                    tracing::warn!("Error disconnecting WebSocket client: {e}");
416                }
417            });
418        }
419
420        self.connected = false;
421        self.started = false;
422
423        tracing::info!("Hyperliquid execution client stopped");
424        Ok(())
425    }
426
427    fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
428        let order = &command.order;
429
430        if order.is_closed() {
431            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
432            return Ok(());
433        }
434
435        if let Err(e) = self.validate_order_submission(order) {
436            self.core.generate_order_rejected(
437                order.strategy_id(),
438                order.instrument_id(),
439                order.client_order_id(),
440                &format!("validation-error: {e}"),
441                command.ts_init,
442                false,
443            );
444            return Err(e);
445        }
446
447        self.core.generate_order_submitted(
448            order.strategy_id(),
449            order.instrument_id(),
450            order.client_order_id(),
451            command.ts_init,
452        );
453
454        let http_client = self.http_client.clone();
455        let order_clone = order.clone();
456
457        self.spawn_task("submit_order", async move {
458            match order_any_to_hyperliquid_request(&order_clone) {
459                Ok(hyperliquid_order) => {
460                    // Create exchange action for order placement with typed struct
461                    let action = ExchangeAction::order(vec![hyperliquid_order]);
462
463                    match http_client.post_action(&action).await {
464                        Ok(response) => {
465                            if is_response_successful(&response) {
466                                tracing::info!("Order submitted successfully: {:?}", response);
467                                // Order acceptance/rejection events will be generated from WebSocket updates
468                                // which provide the venue_order_id and definitive status
469                            } else {
470                                let error_msg = extract_error_message(&response);
471                                tracing::warn!(
472                                    "Order submission rejected by exchange: {}",
473                                    error_msg
474                                );
475                                // Order rejection event will be generated from WebSocket updates
476                            }
477                        }
478                        Err(e) => {
479                            tracing::warn!("Order submission HTTP request failed: {e}");
480                            // WebSocket reconnection and order reconciliation will handle recovery
481                        }
482                    }
483                }
484                Err(e) => {
485                    tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
486                    // This indicates a client-side bug or unsupported order configuration
487                }
488            }
489
490            Ok(())
491        });
492
493        Ok(())
494    }
495
496    fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
497        tracing::debug!(
498            "Submitting order list with {} orders",
499            command.order_list.orders.len()
500        );
501
502        let http_client = self.http_client.clone();
503        let orders: Vec<OrderAny> = command.order_list.orders.clone();
504
505        // Generate submitted events for all orders
506        for order in &orders {
507            self.core.generate_order_submitted(
508                order.strategy_id(),
509                order.instrument_id(),
510                order.client_order_id(),
511                command.ts_init,
512            );
513        }
514
515        self.spawn_task("submit_order_list", async move {
516            // Convert all orders to Hyperliquid format
517            let order_refs: Vec<&OrderAny> = orders.iter().collect();
518            match orders_to_hyperliquid_requests(&order_refs) {
519                Ok(hyperliquid_orders) => {
520                    // Create exchange action for order placement with typed struct
521                    let action = ExchangeAction::order(hyperliquid_orders);
522                    match http_client.post_action(&action).await {
523                        Ok(response) => {
524                            if is_response_successful(&response) {
525                                tracing::info!("Order list submitted successfully: {:?}", response);
526                                // Order acceptance/rejection events will be generated from WebSocket updates
527                            } else {
528                                let error_msg = extract_error_message(&response);
529                                tracing::warn!(
530                                    "Order list submission rejected by exchange: {}",
531                                    error_msg
532                                );
533                                // Individual order rejection events will be generated from WebSocket updates
534                            }
535                        }
536                        Err(e) => {
537                            tracing::warn!("Order list submission HTTP request failed: {e}");
538                            // WebSocket reconciliation will handle recovery
539                        }
540                    }
541                }
542                Err(e) => {
543                    tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
544                }
545            }
546
547            Ok(())
548        });
549
550        Ok(())
551    }
552
553    fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
554        tracing::debug!("Modifying order: {:?}", command);
555
556        // Parse venue_order_id as u64
557        let oid: u64 = match command.venue_order_id.as_str().parse() {
558            Ok(id) => id,
559            Err(e) => {
560                tracing::warn!(
561                    "Failed to parse venue_order_id '{}' as u64: {}",
562                    command.venue_order_id,
563                    e
564                );
565                return Ok(());
566            }
567        };
568
569        let http_client = self.http_client.clone();
570        let price = command.price;
571        let quantity = command.quantity;
572        let symbol = command.instrument_id.symbol.inner();
573
574        self.spawn_task("modify_order", async move {
575            use crate::{
576                common::parse::extract_asset_id_from_symbol,
577                http::models::HyperliquidExecModifyOrderRequest,
578            };
579
580            // Extract asset ID from instrument symbol
581            let asset = match extract_asset_id_from_symbol(&symbol) {
582                Ok(asset) => asset,
583                Err(e) => {
584                    tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
585                    return Ok(());
586                }
587            };
588
589            // Build typed modify request with new price and/or quantity
590            let modify_request = HyperliquidExecModifyOrderRequest {
591                asset,
592                oid,
593                price: price.map(|p| (*p).into()),
594                size: quantity.map(|q| (*q).into()),
595                reduce_only: None,
596                kind: None,
597            };
598
599            let action = ExchangeAction::modify(oid, modify_request);
600
601            match http_client.post_action(&action).await {
602                Ok(response) => {
603                    if is_response_successful(&response) {
604                        tracing::info!("Order modified successfully: {:?}", response);
605                        // Order update events will be generated from WebSocket updates
606                    } else {
607                        let error_msg = extract_error_message(&response);
608                        tracing::warn!("Order modification rejected by exchange: {}", error_msg);
609                        // Order modify rejected events will be generated from WebSocket updates
610                    }
611                }
612                Err(e) => {
613                    tracing::warn!("Order modification HTTP request failed: {e}");
614                    // WebSocket reconciliation will handle recovery
615                }
616            }
617
618            Ok(())
619        });
620
621        Ok(())
622    }
623
624    fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
625        tracing::debug!("Cancelling order: {:?}", command);
626
627        let http_client = self.http_client.clone();
628        let client_order_id = command.client_order_id.inner();
629        let symbol = command.instrument_id.symbol.inner();
630
631        self.spawn_task("cancel_order", async move {
632            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
633                Ok(cancel_request) => {
634                    // Create exchange action for order cancellation with typed struct
635                    let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
636                    match http_client.post_action(&action).await {
637                        Ok(response) => {
638                            if is_response_successful(&response) {
639                                tracing::info!("Order cancelled successfully: {:?}", response);
640                                // Order cancelled events will be generated from WebSocket updates
641                                // which provide definitive confirmation and venue_order_id
642                            } else {
643                                let error_msg = extract_error_message(&response);
644                                tracing::warn!(
645                                    "Order cancellation rejected by exchange: {}",
646                                    error_msg
647                                );
648                                // Order cancel rejected events will be generated from WebSocket updates
649                            }
650                        }
651                        Err(e) => {
652                            tracing::warn!("Order cancellation HTTP request failed: {e}");
653                            // WebSocket reconnection and reconciliation will handle recovery
654                        }
655                    }
656                }
657                Err(e) => {
658                    tracing::warn!(
659                        "Failed to convert order to Hyperliquid cancel format: {:?}",
660                        e
661                    );
662                }
663            }
664
665            Ok(())
666        });
667
668        Ok(())
669    }
670
671    fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
672        tracing::debug!("Cancelling all orders: {:?}", command);
673
674        // Query cache for all open orders matching the instrument and side
675        let cache = self.core.cache().borrow();
676        let open_orders = cache.orders_open(
677            Some(&self.core.venue),
678            Some(&command.instrument_id),
679            None,
680            Some(command.order_side),
681        );
682
683        if open_orders.is_empty() {
684            tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
685            return Ok(());
686        }
687
688        // Convert orders to cancel requests
689        let mut cancel_requests = Vec::new();
690        let symbol = command.instrument_id.symbol.inner();
691        for order in open_orders {
692            let client_order_id = order.client_order_id().inner();
693
694            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
695                Ok(req) => cancel_requests.push(req),
696                Err(e) => {
697                    tracing::warn!(
698                        "Failed to convert order {} to cancel request: {}",
699                        client_order_id,
700                        e
701                    );
702                    continue;
703                }
704            }
705        }
706
707        if cancel_requests.is_empty() {
708            tracing::debug!("No valid cancel requests to send");
709            return Ok(());
710        }
711
712        // Create exchange action for cancellation with typed struct
713        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
714
715        // Send cancel request via HTTP API
716        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
717        let http_client = self.http_client.clone();
718        let runtime = get_runtime();
719        runtime.spawn(async move {
720            if let Err(e) = http_client.post_action(&action).await {
721                tracing::warn!("Failed to send cancel all orders request: {e}");
722            }
723        });
724
725        Ok(())
726    }
727
728    fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
729        tracing::debug!("Batch cancelling orders: {:?}", command);
730
731        if command.cancels.is_empty() {
732            tracing::debug!("No orders to cancel in batch");
733            return Ok(());
734        }
735
736        // Convert each CancelOrder to a cancel request
737        let mut cancel_requests = Vec::new();
738        for cancel_cmd in &command.cancels {
739            let client_order_id = cancel_cmd.client_order_id.inner();
740            let symbol = cancel_cmd.instrument_id.symbol.inner();
741
742            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
743                Ok(req) => cancel_requests.push(req),
744                Err(e) => {
745                    tracing::warn!(
746                        "Failed to convert order {} to cancel request: {}",
747                        client_order_id,
748                        e
749                    );
750                    continue;
751                }
752            }
753        }
754
755        if cancel_requests.is_empty() {
756            tracing::warn!("No valid cancel requests in batch");
757            return Ok(());
758        }
759
760        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
761
762        // Send batch cancel request via HTTP API
763        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
764        let http_client = self.http_client.clone();
765        let runtime = get_runtime();
766        runtime.spawn(async move {
767            if let Err(e) = http_client.post_action(&action).await {
768                tracing::warn!("Failed to send batch cancel orders request: {e}");
769            }
770        });
771
772        Ok(())
773    }
774
775    fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
776        tracing::debug!("Querying account: {:?}", command);
777
778        // Use existing infrastructure to refresh account state
779        let runtime = get_runtime();
780        runtime.block_on(async {
781            if let Err(e) = self.refresh_account_state().await {
782                tracing::warn!("Failed to query account state: {e}");
783            }
784        });
785
786        Ok(())
787    }
788
789    fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
790        tracing::debug!("Querying order: {:?}", command);
791
792        // Get venue order ID from cache
793        let cache = self.core.cache().borrow();
794        let venue_order_id = cache.venue_order_id(&command.client_order_id);
795
796        let venue_order_id = match venue_order_id {
797            Some(oid) => *oid,
798            None => {
799                tracing::warn!(
800                    "No venue order ID found for client order {}",
801                    command.client_order_id
802                );
803                return Ok(());
804            }
805        };
806        drop(cache);
807
808        // Parse venue order ID to u64
809        let oid = match u64::from_str(venue_order_id.as_ref()) {
810            Ok(id) => id,
811            Err(e) => {
812                tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
813                return Ok(());
814            }
815        };
816
817        // Get user address for the query
818        let user_address = self.get_user_address()?;
819
820        // Query order status via HTTP API
821        // Note: The WebSocket connection is the authoritative source for order updates,
822        // this is primarily for reconciliation or when WebSocket is unavailable
823        let http_client = self.http_client.clone();
824        let runtime = get_runtime();
825        runtime.spawn(async move {
826            match http_client.info_order_status(&user_address, oid).await {
827                Ok(status) => {
828                    tracing::debug!("Order status for oid {}: {:?}", oid, status);
829                }
830                Err(e) => {
831                    tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
832                }
833            }
834        });
835
836        Ok(())
837    }
838
839    async fn connect(&mut self) -> anyhow::Result<()> {
840        if self.connected {
841            return Ok(());
842        }
843
844        tracing::info!("Connecting Hyperliquid execution client");
845
846        // Ensure instruments are initialized
847        self.ensure_instruments_initialized_async().await?;
848
849        // Connect WebSocket client
850        self.ws_client.connect().await?;
851
852        // Subscribe to user-specific order updates and fills
853        let user_address = self.get_user_address()?;
854        self.ws_client
855            .subscribe_all_user_channels(&user_address)
856            .await?;
857
858        // Initialize account state
859        self.refresh_account_state().await?;
860
861        self.connected = true;
862        self.core.set_connected(true);
863
864        // Start WebSocket stream for execution updates
865        if let Err(e) = self.start_ws_stream().await {
866            tracing::warn!("Failed to start WebSocket stream: {e}");
867        }
868
869        tracing::info!(client_id = %self.core.client_id, "Connected");
870        Ok(())
871    }
872
873    async fn disconnect(&mut self) -> anyhow::Result<()> {
874        if !self.connected {
875            return Ok(());
876        }
877
878        tracing::info!("Disconnecting Hyperliquid execution client");
879
880        // Disconnect WebSocket
881        self.ws_client.disconnect().await?;
882
883        // Abort any pending tasks
884        self.abort_pending_tasks();
885
886        self.connected = false;
887        self.core.set_connected(false);
888
889        tracing::info!(client_id = %self.core.client_id, "Disconnected");
890        Ok(())
891    }
892}
893
894#[async_trait(?Send)]
895impl LiveExecutionClient for HyperliquidExecutionClient {
896    async fn generate_order_status_report(
897        &self,
898        _cmd: &GenerateOrderStatusReport,
899    ) -> anyhow::Result<Option<OrderStatusReport>> {
900        // NOTE: Single order status report generation requires instrument cache integration.
901        // The HTTP client methods and parsing functions are implemented and ready to use.
902        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
903        tracing::warn!("generate_order_status_report not yet fully implemented");
904        Ok(None)
905    }
906
907    async fn generate_order_status_reports(
908        &self,
909        cmd: &GenerateOrderStatusReport,
910    ) -> anyhow::Result<Vec<OrderStatusReport>> {
911        let user_address = self.get_user_address()?;
912
913        let reports = self
914            .http_client
915            .request_order_status_reports(&user_address, cmd.instrument_id)
916            .await
917            .context("failed to generate order status reports")?;
918
919        // Filter by client_order_id if specified
920        let reports = if let Some(client_order_id) = cmd.client_order_id {
921            reports
922                .into_iter()
923                .filter(|r| r.client_order_id == Some(client_order_id))
924                .collect()
925        } else {
926            reports
927        };
928
929        // Note: cmd.venue_order_id is Option<ClientOrderId> in the struct definition,
930        // but report venue_order_id is VenueOrderId - type mismatch prevents filtering here
931
932        tracing::info!("Generated {} order status reports", reports.len());
933        Ok(reports)
934    }
935
936    async fn generate_fill_reports(
937        &self,
938        cmd: GenerateFillReports,
939    ) -> anyhow::Result<Vec<FillReport>> {
940        let user_address = self.get_user_address()?;
941
942        let reports = self
943            .http_client
944            .request_fill_reports(&user_address, cmd.instrument_id)
945            .await
946            .context("failed to generate fill reports")?;
947
948        // Filter by time range if specified
949        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
950            reports
951                .into_iter()
952                .filter(|r| r.ts_event >= start && r.ts_event <= end)
953                .collect()
954        } else if let Some(start) = cmd.start {
955            reports
956                .into_iter()
957                .filter(|r| r.ts_event >= start)
958                .collect()
959        } else if let Some(end) = cmd.end {
960            reports.into_iter().filter(|r| r.ts_event <= end).collect()
961        } else {
962            reports
963        };
964
965        tracing::info!("Generated {} fill reports", reports.len());
966        Ok(reports)
967    }
968
969    async fn generate_position_status_reports(
970        &self,
971        cmd: &GeneratePositionReports,
972    ) -> anyhow::Result<Vec<PositionStatusReport>> {
973        let user_address = self.get_user_address()?;
974
975        let reports = self
976            .http_client
977            .request_position_status_reports(&user_address, cmd.instrument_id)
978            .await
979            .context("failed to generate position status reports")?;
980
981        tracing::info!("Generated {} position status reports", reports.len());
982        Ok(reports)
983    }
984
985    async fn generate_mass_status(
986        &self,
987        lookback_mins: Option<u64>,
988    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
989        tracing::warn!(
990            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
991        );
992        // Full implementation would require:
993        // 1. Query all orders within lookback window
994        // 2. Query all fills within lookback window
995        // 3. Query all positions
996        // 4. Combine into ExecutionMassStatus
997        Ok(None)
998    }
999}
1000
1001impl HyperliquidExecutionClient {
1002    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1003        {
1004            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1005            if handle_guard.is_some() {
1006                return Ok(());
1007            }
1008        }
1009
1010        let user_address = self.get_user_address()?;
1011        let _account_id = self.core.account_id;
1012        let mut ws_client = self.ws_client.clone();
1013
1014        let instruments = self
1015            .http_client
1016            .request_instruments()
1017            .await
1018            .unwrap_or_default();
1019
1020        for instrument in instruments {
1021            ws_client.cache_instrument(instrument);
1022        }
1023
1024        let runtime = get_runtime();
1025        let handle = runtime.spawn(async move {
1026            if let Err(e) = ws_client.connect().await {
1027                tracing::warn!("Failed to connect WebSocket: {e}");
1028                return;
1029            }
1030
1031            if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1032                tracing::warn!("Failed to subscribe to order updates: {e}");
1033                return;
1034            }
1035
1036            if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1037                tracing::warn!("Failed to subscribe to user events: {e}");
1038                return;
1039            }
1040
1041            tracing::info!("Subscribed to Hyperliquid execution updates");
1042
1043            let _clock = get_atomic_clock_realtime();
1044
1045            loop {
1046                let event = ws_client.next_event().await;
1047
1048                match event {
1049                    Some(msg) => {
1050                        match msg {
1051                            NautilusWsMessage::ExecutionReports(reports) => {
1052                                // Handler already parsed the messages, just dispatch them
1053                                for report in reports {
1054                                    dispatch_execution_report(report);
1055                                }
1056                            }
1057                            NautilusWsMessage::Reconnected => {
1058                                tracing::info!("WebSocket reconnected");
1059                                // TODO: Resubscribe to user channels if needed
1060                            }
1061                            NautilusWsMessage::Error(e) => {
1062                                tracing::error!("WebSocket error: {e}");
1063                            }
1064                            // Handled by data client
1065                            NautilusWsMessage::Trades(_)
1066                            | NautilusWsMessage::Quote(_)
1067                            | NautilusWsMessage::Deltas(_)
1068                            | NautilusWsMessage::Candle(_)
1069                            | NautilusWsMessage::MarkPrice(_)
1070                            | NautilusWsMessage::IndexPrice(_)
1071                            | NautilusWsMessage::FundingRate(_) => {}
1072                        }
1073                    }
1074                    None => {
1075                        tracing::warn!("WebSocket next_event returned None");
1076                        break;
1077                    }
1078                }
1079            }
1080        });
1081
1082        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1083        tracing::info!("Hyperliquid WebSocket execution stream started");
1084        Ok(())
1085    }
1086}
1087
1088fn dispatch_execution_report(report: ExecutionReport) {
1089    let sender = get_exec_event_sender();
1090    match report {
1091        ExecutionReport::Order(order_report) => {
1092            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1093            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1094                tracing::warn!("Failed to send order status report: {e}");
1095            }
1096        }
1097        ExecutionReport::Fill(fill_report) => {
1098            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1099            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1100                tracing::warn!("Failed to send fill report: {e}");
1101            }
1102        }
1103    }
1104}