nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use nautilus_common::{
22    messages::{
23        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
24        execution::{
25            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
26            SubmitOrder, SubmitOrderList,
27        },
28    },
29    runner::get_exec_event_sender,
30    runtime::get_runtime,
31};
32use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
34use nautilus_model::{
35    accounts::AccountAny,
36    enums::{OmsType, OrderType},
37    identifiers::{AccountId, ClientId, Venue},
38    orders::{Order, any::OrderAny},
39    types::{AccountBalance, MarginBalance},
40};
41use serde_json;
42use tokio::task::JoinHandle;
43
44use crate::{
45    common::{
46        HyperliquidProductType,
47        consts::HYPERLIQUID_VENUE,
48        credential::Secrets,
49        parse::{
50            client_order_id_to_cancel_request, extract_error_message, is_response_successful,
51            order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
52        },
53    },
54    config::HyperliquidExecClientConfig,
55    http::{client::HyperliquidHttpClient, models::ClearinghouseState, query::ExchangeAction},
56    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
57};
58
59#[derive(Debug)]
60pub struct HyperliquidExecutionClient {
61    core: ExecutionClientCore,
62    config: HyperliquidExecClientConfig,
63    http_client: HyperliquidHttpClient,
64    ws_client: HyperliquidWebSocketClient,
65    started: bool,
66    connected: bool,
67    instruments_initialized: bool,
68    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
69    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
70}
71
72impl HyperliquidExecutionClient {
73    /// Returns a reference to the configuration.
74    pub fn config(&self) -> &HyperliquidExecClientConfig {
75        &self.config
76    }
77
78    /// Validates order before submission to catch issues early.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the order cannot be submitted to Hyperliquid.
83    ///
84    /// # Supported Order Types
85    ///
86    /// - `Market`: Standard market orders
87    /// - `Limit`: Limit orders with GTC/IOC/ALO time-in-force
88    /// - `StopMarket`: Stop loss / protective stop with market execution
89    /// - `StopLimit`: Stop loss / protective stop with limit price
90    /// - `MarketIfTouched`: Profit taking / entry order with market execution
91    /// - `LimitIfTouched`: Profit taking / entry order with limit price
92    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
93        // Check if instrument symbol is supported
94        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
95        let symbol = order.instrument_id().symbol.to_string();
96        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
97            anyhow::bail!(
98                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
99            );
100        }
101
102        // Check if order type is supported
103        match order.order_type() {
104            OrderType::Market
105            | OrderType::Limit
106            | OrderType::StopMarket
107            | OrderType::StopLimit
108            | OrderType::MarketIfTouched
109            | OrderType::LimitIfTouched => {}
110            _ => anyhow::bail!(
111                "Unsupported order type for Hyperliquid: {:?}",
112                order.order_type()
113            ),
114        }
115
116        // Check if conditional orders have trigger price
117        if matches!(
118            order.order_type(),
119            OrderType::StopMarket
120                | OrderType::StopLimit
121                | OrderType::MarketIfTouched
122                | OrderType::LimitIfTouched
123        ) && order.trigger_price().is_none()
124        {
125            anyhow::bail!(
126                "Conditional orders require a trigger price for Hyperliquid: {:?}",
127                order.order_type()
128            );
129        }
130
131        // Check if limit-based orders have price
132        if matches!(
133            order.order_type(),
134            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
135        ) && order.price().is_none()
136        {
137            anyhow::bail!(
138                "Limit orders require a limit price for Hyperliquid: {:?}",
139                order.order_type()
140            );
141        }
142
143        Ok(())
144    }
145
146    /// Creates a new [`HyperliquidExecutionClient`].
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if either the HTTP or WebSocket client fail to construct.
151    pub fn new(
152        core: ExecutionClientCore,
153        config: HyperliquidExecClientConfig,
154    ) -> anyhow::Result<Self> {
155        if !config.has_credentials() {
156            anyhow::bail!("Hyperliquid execution client requires private key");
157        }
158
159        let secrets = Secrets::from_json(&format!(
160            r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
161            config.private_key, config.is_testnet
162        ))
163        .context("failed to create secrets from private key")?;
164
165        let http_client = HyperliquidHttpClient::with_credentials(
166            &secrets,
167            Some(config.http_timeout_secs),
168            config.http_proxy_url.clone(),
169        )
170        .context("failed to create Hyperliquid HTTP client")?;
171
172        // Create WebSocket client (will connect when needed)
173        // Note: For execution WebSocket (private account messages), product type is less critical
174        // since messages are account-scoped. Defaulting to Perp.
175        let ws_client = HyperliquidWebSocketClient::new(
176            None,
177            config.is_testnet,
178            HyperliquidProductType::Perp,
179            Some(core.account_id),
180        );
181
182        Ok(Self {
183            core,
184            config,
185            http_client,
186            ws_client,
187            started: false,
188            connected: false,
189            instruments_initialized: false,
190            pending_tasks: Mutex::new(Vec::new()),
191            ws_stream_handle: Mutex::new(None),
192        })
193    }
194
195    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
196        if self.instruments_initialized {
197            return Ok(());
198        }
199
200        let instruments = self
201            .http_client
202            .request_instruments()
203            .await
204            .context("failed to request Hyperliquid instruments")?;
205
206        if instruments.is_empty() {
207            tracing::warn!(
208                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
209            );
210        } else {
211            tracing::info!("Initialized {} instruments", instruments.len());
212
213            for instrument in &instruments {
214                self.http_client.cache_instrument(instrument.clone());
215            }
216        }
217
218        self.instruments_initialized = true;
219        Ok(())
220    }
221
222    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
223        if self.instruments_initialized {
224            return Ok(());
225        }
226
227        let runtime = get_runtime();
228        runtime.block_on(self.ensure_instruments_initialized_async())
229    }
230
231    async fn refresh_account_state(&self) -> anyhow::Result<()> {
232        // Get account information from Hyperliquid using the user address
233        // We need to derive the user address from the private key in the config
234        let user_address = self.get_user_address()?;
235
236        // Use vault address if configured, otherwise use user address
237        let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
238
239        // Query clearinghouseState endpoint to get balances and margin info
240        let clearinghouse_state = self
241            .http_client
242            .info_clearinghouse_state(account_address)
243            .await
244            .context("failed to fetch clearinghouse state")?;
245
246        // Deserialize the response
247        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
248            .context("failed to deserialize clearinghouse state")?;
249
250        tracing::debug!(
251            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
252            state.cross_margin_summary,
253            state.asset_positions.len()
254        );
255
256        // Parse balances and margins from cross margin summary
257        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
258            let (balances, margins) =
259                crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
260                    .context("failed to parse account balances and margins")?;
261
262            let ts_event = if let Some(time_ms) = state.time {
263                nautilus_core::UnixNanos::from(time_ms * 1_000_000)
264            } else {
265                nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
266            };
267
268            // Generate account state event
269            self.core.generate_account_state(
270                balances, margins, true, // reported
271                ts_event,
272            )?;
273
274            tracing::info!("Account state updated successfully");
275        } else {
276            tracing::warn!("No cross margin summary in clearinghouse state");
277        }
278
279        Ok(())
280    }
281
282    fn get_user_address(&self) -> anyhow::Result<String> {
283        let address = self
284            .http_client
285            .get_user_address()
286            .context("failed to get user address from HTTP client")?;
287
288        Ok(address)
289    }
290
291    fn spawn_task<F>(&self, description: &'static str, fut: F)
292    where
293        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
294    {
295        let runtime = get_runtime();
296        let handle = runtime.spawn(async move {
297            if let Err(e) = fut.await {
298                tracing::warn!("{description} failed: {e:?}");
299            }
300        });
301
302        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
303        tasks.retain(|handle| !handle.is_finished());
304        tasks.push(handle);
305    }
306
307    fn abort_pending_tasks(&self) {
308        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
309        for handle in tasks.drain(..) {
310            handle.abort();
311        }
312    }
313
314    fn update_account_state(&self) -> anyhow::Result<()> {
315        let runtime = get_runtime();
316        runtime.block_on(self.refresh_account_state())
317    }
318}
319
320impl ExecutionClient for HyperliquidExecutionClient {
321    fn is_connected(&self) -> bool {
322        self.connected
323    }
324
325    fn client_id(&self) -> ClientId {
326        self.core.client_id
327    }
328
329    fn account_id(&self) -> AccountId {
330        self.core.account_id
331    }
332
333    fn venue(&self) -> Venue {
334        *HYPERLIQUID_VENUE
335    }
336
337    fn oms_type(&self) -> OmsType {
338        self.core.oms_type
339    }
340
341    fn get_account(&self) -> Option<AccountAny> {
342        self.core.get_account()
343    }
344
345    fn generate_account_state(
346        &self,
347        balances: Vec<AccountBalance>,
348        margins: Vec<MarginBalance>,
349        reported: bool,
350        ts_event: UnixNanos,
351    ) -> anyhow::Result<()> {
352        self.core
353            .generate_account_state(balances, margins, reported, ts_event)
354    }
355
356    fn start(&mut self) -> anyhow::Result<()> {
357        if self.started {
358            return Ok(());
359        }
360
361        tracing::info!(
362            client_id = %self.core.client_id,
363            account_id = %self.core.account_id,
364            is_testnet = self.config.is_testnet,
365            vault_address = ?self.config.vault_address,
366            http_proxy_url = ?self.config.http_proxy_url,
367            ws_proxy_url = ?self.config.ws_proxy_url,
368            "Starting Hyperliquid execution client"
369        );
370
371        // Ensure instruments are initialized
372        self.ensure_instruments_initialized()?;
373
374        // Initialize account state
375        if let Err(e) = self.update_account_state() {
376            tracing::warn!("Failed to initialize account state: {e}");
377        }
378
379        self.connected = true;
380        self.started = true;
381
382        // Start WebSocket stream for execution updates
383        if let Err(e) = get_runtime().block_on(self.start_ws_stream()) {
384            tracing::warn!("Failed to start WebSocket stream: {e}");
385        }
386
387        tracing::info!("Hyperliquid execution client started");
388        Ok(())
389    }
390    fn stop(&mut self) -> anyhow::Result<()> {
391        if !self.started {
392            return Ok(());
393        }
394
395        tracing::info!("Stopping Hyperliquid execution client");
396
397        // Stop WebSocket stream
398        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
399            handle.abort();
400        }
401
402        // Abort any pending tasks
403        self.abort_pending_tasks();
404
405        // Disconnect WebSocket
406        if self.connected {
407            let runtime = get_runtime();
408            runtime.block_on(async {
409                if let Err(e) = self.ws_client.disconnect().await {
410                    tracing::warn!("Error disconnecting WebSocket client: {e}");
411                }
412            });
413        }
414
415        self.connected = false;
416        self.started = false;
417
418        tracing::info!("Hyperliquid execution client stopped");
419        Ok(())
420    }
421
422    fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
423        let order = &command.order;
424
425        if order.is_closed() {
426            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
427            return Ok(());
428        }
429
430        if let Err(e) = self.validate_order_submission(order) {
431            self.core.generate_order_rejected(
432                order.strategy_id(),
433                order.instrument_id(),
434                order.client_order_id(),
435                &format!("validation-error: {e}"),
436                command.ts_init,
437                false,
438            );
439            return Err(e);
440        }
441
442        self.core.generate_order_submitted(
443            order.strategy_id(),
444            order.instrument_id(),
445            order.client_order_id(),
446            command.ts_init,
447        );
448
449        let http_client = self.http_client.clone();
450        let order_clone = order.clone();
451
452        self.spawn_task("submit_order", async move {
453            match order_any_to_hyperliquid_request(&order_clone) {
454                Ok(hyperliquid_order) => {
455                    // Create exchange action for order placement with typed struct
456                    let action = ExchangeAction::order(vec![hyperliquid_order]);
457
458                    match http_client.post_action(&action).await {
459                        Ok(response) => {
460                            if is_response_successful(&response) {
461                                tracing::info!("Order submitted successfully: {:?}", response);
462                                // Order acceptance/rejection events will be generated from WebSocket updates
463                                // which provide the venue_order_id and definitive status
464                            } else {
465                                let error_msg = extract_error_message(&response);
466                                tracing::warn!(
467                                    "Order submission rejected by exchange: {}",
468                                    error_msg
469                                );
470                                // Order rejection event will be generated from WebSocket updates
471                            }
472                        }
473                        Err(e) => {
474                            tracing::warn!("Order submission HTTP request failed: {e}");
475                            // WebSocket reconnection and order reconciliation will handle recovery
476                        }
477                    }
478                }
479                Err(e) => {
480                    tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
481                    // This indicates a client-side bug or unsupported order configuration
482                }
483            }
484
485            Ok(())
486        });
487
488        Ok(())
489    }
490
491    fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
492        tracing::debug!(
493            "Submitting order list with {} orders",
494            command.order_list.orders.len()
495        );
496
497        let http_client = self.http_client.clone();
498        let orders: Vec<OrderAny> = command.order_list.orders.clone();
499
500        // Generate submitted events for all orders
501        for order in &orders {
502            self.core.generate_order_submitted(
503                order.strategy_id(),
504                order.instrument_id(),
505                order.client_order_id(),
506                command.ts_init,
507            );
508        }
509
510        self.spawn_task("submit_order_list", async move {
511            // Convert all orders to Hyperliquid format
512            let order_refs: Vec<&OrderAny> = orders.iter().collect();
513            match orders_to_hyperliquid_requests(&order_refs) {
514                Ok(hyperliquid_orders) => {
515                    // Create exchange action for order placement with typed struct
516                    let action = ExchangeAction::order(hyperliquid_orders);
517                    match http_client.post_action(&action).await {
518                        Ok(response) => {
519                            if is_response_successful(&response) {
520                                tracing::info!("Order list submitted successfully: {:?}", response);
521                                // Order acceptance/rejection events will be generated from WebSocket updates
522                            } else {
523                                let error_msg = extract_error_message(&response);
524                                tracing::warn!(
525                                    "Order list submission rejected by exchange: {}",
526                                    error_msg
527                                );
528                                // Individual order rejection events will be generated from WebSocket updates
529                            }
530                        }
531                        Err(e) => {
532                            tracing::warn!("Order list submission HTTP request failed: {e}");
533                            // WebSocket reconciliation will handle recovery
534                        }
535                    }
536                }
537                Err(e) => {
538                    tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
539                }
540            }
541
542            Ok(())
543        });
544
545        Ok(())
546    }
547
548    fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
549        tracing::debug!("Modifying order: {:?}", command);
550
551        // Parse venue_order_id as u64
552        let oid: u64 = match command.venue_order_id.as_str().parse() {
553            Ok(id) => id,
554            Err(e) => {
555                tracing::warn!(
556                    "Failed to parse venue_order_id '{}' as u64: {}",
557                    command.venue_order_id,
558                    e
559                );
560                return Ok(());
561            }
562        };
563
564        let http_client = self.http_client.clone();
565        let price = command.price;
566        let quantity = command.quantity;
567        let symbol = command.instrument_id.symbol.to_string();
568
569        self.spawn_task("modify_order", async move {
570            use crate::{
571                common::parse::extract_asset_id_from_symbol,
572                http::models::HyperliquidExecModifyOrderRequest,
573            };
574
575            // Extract asset ID from instrument symbol
576            let asset = match extract_asset_id_from_symbol(&symbol) {
577                Ok(asset) => asset,
578                Err(e) => {
579                    tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
580                    return Ok(());
581                }
582            };
583
584            // Build typed modify request with new price and/or quantity
585            let modify_request = HyperliquidExecModifyOrderRequest {
586                asset,
587                oid,
588                price: price.map(|p| (*p).into()),
589                size: quantity.map(|q| (*q).into()),
590                reduce_only: None,
591                kind: None,
592            };
593
594            let action = ExchangeAction::modify(oid, modify_request);
595
596            match http_client.post_action(&action).await {
597                Ok(response) => {
598                    if is_response_successful(&response) {
599                        tracing::info!("Order modified successfully: {:?}", response);
600                        // Order update events will be generated from WebSocket updates
601                    } else {
602                        let error_msg = extract_error_message(&response);
603                        tracing::warn!("Order modification rejected by exchange: {}", error_msg);
604                        // Order modify rejected events will be generated from WebSocket updates
605                    }
606                }
607                Err(e) => {
608                    tracing::warn!("Order modification HTTP request failed: {e}");
609                    // WebSocket reconciliation will handle recovery
610                }
611            }
612
613            Ok(())
614        });
615
616        Ok(())
617    }
618
619    fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
620        tracing::debug!("Cancelling order: {:?}", command);
621
622        let http_client = self.http_client.clone();
623        let client_order_id = command.client_order_id.to_string();
624        let symbol = command.instrument_id.symbol.to_string();
625
626        self.spawn_task("cancel_order", async move {
627            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
628                Ok(cancel_request) => {
629                    // Create exchange action for order cancellation with typed struct
630                    let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
631                    match http_client.post_action(&action).await {
632                        Ok(response) => {
633                            if is_response_successful(&response) {
634                                tracing::info!("Order cancelled successfully: {:?}", response);
635                                // Order cancelled events will be generated from WebSocket updates
636                                // which provide definitive confirmation and venue_order_id
637                            } else {
638                                let error_msg = extract_error_message(&response);
639                                tracing::warn!(
640                                    "Order cancellation rejected by exchange: {}",
641                                    error_msg
642                                );
643                                // Order cancel rejected events will be generated from WebSocket updates
644                            }
645                        }
646                        Err(e) => {
647                            tracing::warn!("Order cancellation HTTP request failed: {e}");
648                            // WebSocket reconnection and reconciliation will handle recovery
649                        }
650                    }
651                }
652                Err(e) => {
653                    tracing::warn!(
654                        "Failed to convert order to Hyperliquid cancel format: {:?}",
655                        e
656                    );
657                }
658            }
659
660            Ok(())
661        });
662
663        Ok(())
664    }
665
666    fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
667        tracing::debug!("Cancelling all orders: {:?}", command);
668
669        // Query cache for all open orders matching the instrument and side
670        let cache = self.core.cache().borrow();
671        let open_orders = cache.orders_open(
672            Some(&self.core.venue),
673            Some(&command.instrument_id),
674            None,
675            Some(command.order_side),
676        );
677
678        if open_orders.is_empty() {
679            tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
680            return Ok(());
681        }
682
683        // Convert orders to cancel requests
684        let mut cancel_requests = Vec::new();
685        for order in open_orders {
686            let client_order_id = order.client_order_id().to_string();
687            let symbol = command.instrument_id.symbol.to_string();
688
689            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
690                Ok(req) => cancel_requests.push(req),
691                Err(e) => {
692                    tracing::warn!(
693                        "Failed to convert order {} to cancel request: {}",
694                        client_order_id,
695                        e
696                    );
697                    continue;
698                }
699            }
700        }
701
702        if cancel_requests.is_empty() {
703            tracing::debug!("No valid cancel requests to send");
704            return Ok(());
705        }
706
707        // Create exchange action for cancellation with typed struct
708        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
709
710        // Send cancel request via HTTP API
711        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
712        let http_client = self.http_client.clone();
713        let runtime = get_runtime();
714        runtime.spawn(async move {
715            if let Err(e) = http_client.post_action(&action).await {
716                tracing::warn!("Failed to send cancel all orders request: {e}");
717            }
718        });
719
720        Ok(())
721    }
722
723    fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
724        tracing::debug!("Batch cancelling orders: {:?}", command);
725
726        if command.cancels.is_empty() {
727            tracing::debug!("No orders to cancel in batch");
728            return Ok(());
729        }
730
731        // Convert each CancelOrder to a cancel request
732        let mut cancel_requests = Vec::new();
733        for cancel_cmd in &command.cancels {
734            let client_order_id = cancel_cmd.client_order_id.to_string();
735            let symbol = cancel_cmd.instrument_id.symbol.to_string();
736
737            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
738                Ok(req) => cancel_requests.push(req),
739                Err(e) => {
740                    tracing::warn!(
741                        "Failed to convert order {} to cancel request: {}",
742                        client_order_id,
743                        e
744                    );
745                    continue;
746                }
747            }
748        }
749
750        if cancel_requests.is_empty() {
751            tracing::warn!("No valid cancel requests in batch");
752            return Ok(());
753        }
754
755        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
756
757        // Send batch cancel request via HTTP API
758        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
759        let http_client = self.http_client.clone();
760        let runtime = get_runtime();
761        runtime.spawn(async move {
762            if let Err(e) = http_client.post_action(&action).await {
763                tracing::warn!("Failed to send batch cancel orders request: {e}");
764            }
765        });
766
767        Ok(())
768    }
769
770    fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
771        tracing::debug!("Querying account: {:?}", command);
772
773        // Use existing infrastructure to refresh account state
774        let runtime = get_runtime();
775        runtime.block_on(async {
776            if let Err(e) = self.refresh_account_state().await {
777                tracing::warn!("Failed to query account state: {e}");
778            }
779        });
780
781        Ok(())
782    }
783
784    fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
785        tracing::debug!("Querying order: {:?}", command);
786
787        // Get venue order ID from cache
788        let cache = self.core.cache().borrow();
789        let venue_order_id = cache.venue_order_id(&command.client_order_id);
790
791        let venue_order_id = match venue_order_id {
792            Some(oid) => *oid,
793            None => {
794                tracing::warn!(
795                    "No venue order ID found for client order {}",
796                    command.client_order_id
797                );
798                return Ok(());
799            }
800        };
801        drop(cache);
802
803        // Parse venue order ID to u64
804        let oid = match u64::from_str(venue_order_id.as_ref()) {
805            Ok(id) => id,
806            Err(e) => {
807                tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
808                return Ok(());
809            }
810        };
811
812        // Get user address for the query
813        let user_address = self.get_user_address()?;
814
815        // Query order status via HTTP API
816        // Note: The WebSocket connection is the authoritative source for order updates,
817        // this is primarily for reconciliation or when WebSocket is unavailable
818        let http_client = self.http_client.clone();
819        let runtime = get_runtime();
820        runtime.spawn(async move {
821            match http_client.info_order_status(&user_address, oid).await {
822                Ok(status) => {
823                    tracing::debug!("Order status for oid {}: {:?}", oid, status);
824                }
825                Err(e) => {
826                    tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
827                }
828            }
829        });
830
831        Ok(())
832    }
833}
834
835////////////////////////////////////////////////////////////////////////////////
836// LiveExecutionClient Implementation
837////////////////////////////////////////////////////////////////////////////////
838
839use async_trait::async_trait;
840use nautilus_common::messages::execution::{
841    GenerateFillReports, GenerateOrderStatusReport, GeneratePositionReports,
842};
843use nautilus_live::execution::client::LiveExecutionClient;
844use nautilus_model::reports::{
845    ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport,
846};
847
848#[async_trait(?Send)]
849impl LiveExecutionClient for HyperliquidExecutionClient {
850    fn get_message_channel(
851        &self,
852    ) -> tokio::sync::mpsc::UnboundedSender<nautilus_common::messages::ExecutionEvent> {
853        get_exec_event_sender()
854    }
855
856    fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
857        self.core.clock().borrow()
858    }
859
860    async fn connect(&mut self) -> anyhow::Result<()> {
861        if self.connected {
862            return Ok(());
863        }
864
865        tracing::info!("Connecting Hyperliquid execution client");
866
867        // Ensure instruments are initialized
868        self.ensure_instruments_initialized_async().await?;
869
870        // Connect WebSocket client
871        self.ws_client.connect().await?;
872
873        // Subscribe to user-specific order updates and fills
874        let user_address = self.get_user_address()?;
875        self.ws_client
876            .subscribe_all_user_channels(&user_address)
877            .await?;
878
879        // Initialize account state
880        self.refresh_account_state().await?;
881
882        // Note: Order reconciliation is handled by the execution engine
883        // which will call generate_order_status_reports() after connection
884
885        self.connected = true;
886        self.core.set_connected(true);
887
888        // Start WebSocket stream for execution updates
889        if let Err(e) = self.start_ws_stream().await {
890            tracing::warn!("Failed to start WebSocket stream: {e}");
891        }
892
893        tracing::info!(client_id = %self.core.client_id, "Connected");
894        Ok(())
895    }
896
897    async fn disconnect(&mut self) -> anyhow::Result<()> {
898        if !self.connected {
899            return Ok(());
900        }
901
902        tracing::info!("Disconnecting Hyperliquid execution client");
903
904        // Disconnect WebSocket
905        self.ws_client.disconnect().await?;
906
907        // Abort any pending tasks
908        self.abort_pending_tasks();
909
910        self.connected = false;
911        self.core.set_connected(false);
912
913        tracing::info!(client_id = %self.core.client_id, "Disconnected");
914        Ok(())
915    }
916
917    async fn generate_order_status_report(
918        &self,
919        _cmd: &GenerateOrderStatusReport,
920    ) -> anyhow::Result<Option<OrderStatusReport>> {
921        // NOTE: Single order status report generation requires instrument cache integration.
922        // The HTTP client methods and parsing functions are implemented and ready to use.
923        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
924        tracing::warn!("generate_order_status_report not yet fully implemented");
925        Ok(None)
926    }
927
928    async fn generate_order_status_reports(
929        &self,
930        cmd: &GenerateOrderStatusReport,
931    ) -> anyhow::Result<Vec<OrderStatusReport>> {
932        let user_address = self.get_user_address()?;
933
934        let reports = self
935            .http_client
936            .request_order_status_reports(&user_address, cmd.instrument_id)
937            .await
938            .context("failed to generate order status reports")?;
939
940        // Filter by client_order_id if specified
941        let reports = if let Some(client_order_id) = cmd.client_order_id {
942            reports
943                .into_iter()
944                .filter(|r| r.client_order_id == Some(client_order_id))
945                .collect()
946        } else {
947            reports
948        };
949
950        // Note: cmd.venue_order_id is Option<ClientOrderId> in the struct definition,
951        // but report venue_order_id is VenueOrderId - type mismatch prevents filtering here
952
953        tracing::info!("Generated {} order status reports", reports.len());
954        Ok(reports)
955    }
956
957    async fn generate_fill_reports(
958        &self,
959        cmd: GenerateFillReports,
960    ) -> anyhow::Result<Vec<FillReport>> {
961        let user_address = self.get_user_address()?;
962
963        let reports = self
964            .http_client
965            .request_fill_reports(&user_address, cmd.instrument_id)
966            .await
967            .context("failed to generate fill reports")?;
968
969        // Filter by time range if specified
970        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
971            reports
972                .into_iter()
973                .filter(|r| r.ts_event >= start && r.ts_event <= end)
974                .collect()
975        } else if let Some(start) = cmd.start {
976            reports
977                .into_iter()
978                .filter(|r| r.ts_event >= start)
979                .collect()
980        } else if let Some(end) = cmd.end {
981            reports.into_iter().filter(|r| r.ts_event <= end).collect()
982        } else {
983            reports
984        };
985
986        tracing::info!("Generated {} fill reports", reports.len());
987        Ok(reports)
988    }
989
990    async fn generate_position_status_reports(
991        &self,
992        cmd: &GeneratePositionReports,
993    ) -> anyhow::Result<Vec<PositionStatusReport>> {
994        let user_address = self.get_user_address()?;
995
996        let reports = self
997            .http_client
998            .request_position_status_reports(&user_address, cmd.instrument_id)
999            .await
1000            .context("failed to generate position status reports")?;
1001
1002        tracing::info!("Generated {} position status reports", reports.len());
1003        Ok(reports)
1004    }
1005
1006    async fn generate_mass_status(
1007        &self,
1008        lookback_mins: Option<u64>,
1009    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1010        tracing::warn!(
1011            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1012        );
1013        // Full implementation would require:
1014        // 1. Query all orders within lookback window
1015        // 2. Query all fills within lookback window
1016        // 3. Query all positions
1017        // 4. Combine into ExecutionMassStatus
1018        Ok(None)
1019    }
1020}
1021
1022impl HyperliquidExecutionClient {
1023    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1024        {
1025            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1026            if handle_guard.is_some() {
1027                return Ok(());
1028            }
1029        }
1030
1031        let user_address = self.get_user_address()?;
1032        let _account_id = self.core.account_id;
1033        let mut ws_client = self.ws_client.clone();
1034
1035        let instruments = self
1036            .http_client
1037            .request_instruments()
1038            .await
1039            .unwrap_or_default();
1040
1041        for instrument in instruments {
1042            ws_client.cache_instrument(instrument);
1043        }
1044
1045        let runtime = get_runtime();
1046        let handle = runtime.spawn(async move {
1047            if let Err(e) = ws_client.connect().await {
1048                tracing::warn!("Failed to connect WebSocket: {e}");
1049                return;
1050            }
1051
1052            if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1053                tracing::warn!("Failed to subscribe to order updates: {e}");
1054                return;
1055            }
1056
1057            if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1058                tracing::warn!("Failed to subscribe to user events: {e}");
1059                return;
1060            }
1061
1062            tracing::info!("Subscribed to Hyperliquid execution updates");
1063
1064            let _clock = get_atomic_clock_realtime();
1065
1066            loop {
1067                let event = ws_client.next_event().await;
1068
1069                match event {
1070                    Some(msg) => {
1071                        match msg {
1072                            NautilusWsMessage::ExecutionReports(reports) => {
1073                                // Handler already parsed the messages, just dispatch them
1074                                for report in reports {
1075                                    dispatch_execution_report(report);
1076                                }
1077                            }
1078                            NautilusWsMessage::Reconnected => {
1079                                tracing::info!("WebSocket reconnected");
1080                                // TODO: Resubscribe to user channels if needed
1081                            }
1082                            NautilusWsMessage::Error(e) => {
1083                                tracing::error!("WebSocket error: {e}");
1084                            }
1085                            // Handled by data client
1086                            NautilusWsMessage::Trades(_)
1087                            | NautilusWsMessage::Quote(_)
1088                            | NautilusWsMessage::Deltas(_)
1089                            | NautilusWsMessage::Candle(_)
1090                            | NautilusWsMessage::MarkPrice(_)
1091                            | NautilusWsMessage::IndexPrice(_)
1092                            | NautilusWsMessage::FundingRate(_) => {}
1093                        }
1094                    }
1095                    None => {
1096                        tracing::warn!("WebSocket next_event returned None");
1097                        break;
1098                    }
1099                }
1100            }
1101        });
1102
1103        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1104        tracing::info!("Hyperliquid WebSocket execution stream started");
1105        Ok(())
1106    }
1107}
1108
1109fn dispatch_execution_report(report: ExecutionReport) {
1110    let sender = get_exec_event_sender();
1111    match report {
1112        ExecutionReport::Order(order_report) => {
1113            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1114            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1115                tracing::warn!("Failed to send order status report: {e}");
1116            }
1117        }
1118        ExecutionReport::Fill(fill_report) => {
1119            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1120            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1121                tracing::warn!("Failed to send fill report: {e}");
1122            }
1123        }
1124    }
1125}