Skip to main content

nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    str::FromStr,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use ahash::AHashSet;
25use anyhow::Context;
26use async_trait::async_trait;
27use nautilus_common::{
28    clients::ExecutionClient,
29    live::{runner::get_exec_event_sender, runtime::get_runtime},
30    messages::{
31        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
32        execution::{
33            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
35            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36        },
37    },
38};
39use nautilus_core::{
40    MUTEX_POISONED, UUID4, UnixNanos,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{AccountType, OmsType, OrderStatus, OrderType},
47    identifiers::{AccountId, ClientId, ClientOrderId, Venue},
48    orders::{Order, any::OrderAny},
49    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
50    types::{AccountBalance, MarginBalance},
51};
52use tokio::task::JoinHandle;
53use ustr::Ustr;
54
55use crate::{
56    common::{
57        consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_FEE_ADDRESS, NAUTILUS_BUILDER_FEE_TENTHS_BP},
58        credential::Secrets,
59        parse::{
60            client_order_id_to_cancel_request_with_asset, extract_error_message,
61            is_response_successful, order_to_hyperliquid_request_with_asset,
62            parse_account_balances_and_margins,
63        },
64    },
65    config::HyperliquidExecClientConfig,
66    http::{
67        client::HyperliquidHttpClient,
68        models::{
69            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
70            HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest,
71        },
72    },
73    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
74};
75
76#[derive(Debug)]
77pub struct HyperliquidExecutionClient {
78    core: ExecutionClientCore,
79    clock: &'static AtomicTime,
80    config: HyperliquidExecClientConfig,
81    emitter: ExecutionEventEmitter,
82    http_client: HyperliquidHttpClient,
83    ws_client: HyperliquidWebSocketClient,
84    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
85    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
86}
87
88impl HyperliquidExecutionClient {
89    /// Returns a reference to the configuration.
90    pub fn config(&self) -> &HyperliquidExecClientConfig {
91        &self.config
92    }
93
94    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
95        // Check if instrument symbol is supported
96        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
97        let instrument_id = order.instrument_id();
98        let symbol = instrument_id.symbol.as_str();
99        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
100            anyhow::bail!(
101                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
102            );
103        }
104
105        // Check if order type is supported
106        match order.order_type() {
107            OrderType::Market
108            | OrderType::Limit
109            | OrderType::StopMarket
110            | OrderType::StopLimit
111            | OrderType::MarketIfTouched
112            | OrderType::LimitIfTouched => {}
113            _ => anyhow::bail!(
114                "Unsupported order type for Hyperliquid: {:?}",
115                order.order_type()
116            ),
117        }
118
119        // Check if conditional orders have trigger price
120        if matches!(
121            order.order_type(),
122            OrderType::StopMarket
123                | OrderType::StopLimit
124                | OrderType::MarketIfTouched
125                | OrderType::LimitIfTouched
126        ) && order.trigger_price().is_none()
127        {
128            anyhow::bail!(
129                "Conditional orders require a trigger price for Hyperliquid: {:?}",
130                order.order_type()
131            );
132        }
133
134        // Check if limit-based orders have price
135        if matches!(
136            order.order_type(),
137            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138        ) && order.price().is_none()
139        {
140            anyhow::bail!(
141                "Limit orders require a limit price for Hyperliquid: {:?}",
142                order.order_type()
143            );
144        }
145
146        Ok(())
147    }
148
149    /// Creates a new [`HyperliquidExecutionClient`].
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if either the HTTP or WebSocket client fail to construct.
154    pub fn new(
155        core: ExecutionClientCore,
156        config: HyperliquidExecClientConfig,
157    ) -> anyhow::Result<Self> {
158        if !config.has_credentials() {
159            anyhow::bail!("Hyperliquid execution client requires private key");
160        }
161
162        let secrets = Secrets::from_json(&format!(
163            r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164            config.private_key, config.is_testnet
165        ))
166        .context("failed to create secrets from private key")?;
167
168        let mut http_client = HyperliquidHttpClient::with_secrets(
169            &secrets,
170            Some(config.http_timeout_secs),
171            config.http_proxy_url.clone(),
172        )
173        .context("failed to create Hyperliquid HTTP client")?;
174
175        http_client.set_account_id(core.account_id);
176
177        // Create WebSocket client for order/execution updates
178        let ws_client =
179            HyperliquidWebSocketClient::new(None, config.is_testnet, Some(core.account_id));
180
181        let clock = get_atomic_clock_realtime();
182        let emitter = ExecutionEventEmitter::new(
183            clock,
184            core.trader_id,
185            core.account_id,
186            AccountType::Margin,
187            None,
188        );
189
190        Ok(Self {
191            core,
192            clock,
193            config,
194            emitter,
195            http_client,
196            ws_client,
197            pending_tasks: Mutex::new(Vec::new()),
198            ws_stream_handle: Mutex::new(None),
199        })
200    }
201
202    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
203        if self.core.instruments_initialized() {
204            return Ok(());
205        }
206
207        let instruments = self
208            .http_client
209            .request_instruments()
210            .await
211            .context("failed to request Hyperliquid instruments")?;
212
213        if instruments.is_empty() {
214            log::warn!(
215                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
216            );
217        } else {
218            log::info!("Initialized {} instruments", instruments.len());
219
220            for instrument in &instruments {
221                self.http_client.cache_instrument(instrument.clone());
222            }
223        }
224
225        self.core.set_instruments_initialized();
226        Ok(())
227    }
228
229    async fn refresh_account_state(&self) -> anyhow::Result<()> {
230        let account_address = self.get_account_address()?;
231
232        let clearinghouse_state = self
233            .http_client
234            .info_clearinghouse_state(&account_address)
235            .await
236            .context("failed to fetch clearinghouse state")?;
237
238        // Deserialize the response
239        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
240            .context("failed to deserialize clearinghouse state")?;
241
242        log::debug!(
243            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
244            state.cross_margin_summary,
245            state.asset_positions.len()
246        );
247
248        // Parse balances and margins from cross margin summary
249        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
250            let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
251                .context("failed to parse account balances and margins")?;
252
253            // Generate account state event
254            let ts_event = self.clock.get_time_ns();
255            self.emitter
256                .emit_account_state(balances, margins, true, ts_event);
257
258            log::info!("Account state updated successfully");
259        } else {
260            log::warn!("No cross margin summary in clearinghouse state");
261        }
262
263        Ok(())
264    }
265
266    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
267        let account_id = self.core.account_id;
268
269        if self.core.cache().account(&account_id).is_some() {
270            log::info!("Account {account_id} registered");
271            return Ok(());
272        }
273
274        let start = Instant::now();
275        let timeout = Duration::from_secs_f64(timeout_secs);
276        let interval = Duration::from_millis(10);
277
278        loop {
279            tokio::time::sleep(interval).await;
280
281            if self.core.cache().account(&account_id).is_some() {
282                log::info!("Account {account_id} registered");
283                return Ok(());
284            }
285
286            if start.elapsed() >= timeout {
287                anyhow::bail!(
288                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
289                );
290            }
291        }
292    }
293
294    fn get_user_address(&self) -> anyhow::Result<String> {
295        self.http_client
296            .get_user_address()
297            .context("failed to get user address from HTTP client")
298    }
299
300    fn get_account_address(&self) -> anyhow::Result<String> {
301        match &self.config.vault_address {
302            Some(vault) => Ok(vault.clone()),
303            None => self.get_user_address(),
304        }
305    }
306
307    fn spawn_task<F>(&self, description: &'static str, fut: F)
308    where
309        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
310    {
311        let runtime = get_runtime();
312        let handle = runtime.spawn(async move {
313            if let Err(e) = fut.await {
314                log::warn!("{description} failed: {e:?}");
315            }
316        });
317
318        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
319        tasks.retain(|handle| !handle.is_finished());
320        tasks.push(handle);
321    }
322
323    fn abort_pending_tasks(&self) {
324        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
325        for handle in tasks.drain(..) {
326            handle.abort();
327        }
328    }
329}
330
331#[async_trait(?Send)]
332impl ExecutionClient for HyperliquidExecutionClient {
333    fn is_connected(&self) -> bool {
334        self.core.is_connected()
335    }
336
337    fn client_id(&self) -> ClientId {
338        self.core.client_id
339    }
340
341    fn account_id(&self) -> AccountId {
342        self.core.account_id
343    }
344
345    fn venue(&self) -> Venue {
346        *HYPERLIQUID_VENUE
347    }
348
349    fn oms_type(&self) -> OmsType {
350        self.core.oms_type
351    }
352
353    fn get_account(&self) -> Option<AccountAny> {
354        self.core.cache().account(&self.core.account_id).cloned()
355    }
356
357    fn generate_account_state(
358        &self,
359        balances: Vec<AccountBalance>,
360        margins: Vec<MarginBalance>,
361        reported: bool,
362        ts_event: UnixNanos,
363    ) -> anyhow::Result<()> {
364        self.emitter
365            .emit_account_state(balances, margins, reported, ts_event);
366        Ok(())
367    }
368
369    fn start(&mut self) -> anyhow::Result<()> {
370        if self.core.is_started() {
371            return Ok(());
372        }
373
374        let sender = get_exec_event_sender();
375        self.emitter.set_sender(sender);
376        self.core.set_started();
377
378        log::info!(
379            "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
380            self.core.client_id,
381            self.core.account_id,
382            self.config.is_testnet,
383            self.config.vault_address,
384            self.config.http_proxy_url,
385            self.config.ws_proxy_url,
386        );
387
388        Ok(())
389    }
390
391    fn stop(&mut self) -> anyhow::Result<()> {
392        if self.core.is_stopped() {
393            return Ok(());
394        }
395
396        log::info!("Stopping Hyperliquid execution client");
397
398        // Stop WebSocket stream
399        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
400            handle.abort();
401        }
402
403        // Abort any pending tasks
404        self.abort_pending_tasks();
405
406        // Disconnect WebSocket
407        if self.core.is_connected() {
408            let runtime = get_runtime();
409            runtime.block_on(async {
410                if let Err(e) = self.ws_client.disconnect().await {
411                    log::warn!("Error disconnecting WebSocket client: {e}");
412                }
413            });
414        }
415
416        self.core.set_disconnected();
417        self.core.set_stopped();
418
419        log::info!("Hyperliquid execution client stopped");
420        Ok(())
421    }
422
423    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
424        let order = self
425            .core
426            .cache()
427            .order(&cmd.client_order_id)
428            .cloned()
429            .ok_or_else(|| {
430                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
431            })?;
432
433        if order.is_closed() {
434            log::warn!("Cannot submit closed order {}", order.client_order_id());
435            return Ok(());
436        }
437
438        if let Err(e) = self.validate_order_submission(&order) {
439            self.emitter
440                .emit_order_denied(&order, &format!("Validation failed: {e}"));
441            return Err(e);
442        }
443
444        let http_client = self.http_client.clone();
445        let symbol = order.instrument_id().symbol.to_string();
446
447        // Validate asset index exists before marking as submitted
448        let asset = match http_client.get_asset_index(&symbol) {
449            Some(a) => a,
450            None => {
451                self.emitter
452                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
453                return Ok(());
454            }
455        };
456
457        // Validate order conversion before marking as submitted
458        let hyperliquid_order = match order_to_hyperliquid_request_with_asset(&order, asset) {
459            Ok(req) => req,
460            Err(e) => {
461                self.emitter
462                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
463                return Ok(());
464            }
465        };
466
467        // Cache cloid mapping before emitting submitted so WS handler
468        // can resolve order/fill reports back to this client_order_id
469        let cloid = Cloid::from_client_order_id(order.client_order_id());
470        self.ws_client
471            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
472
473        self.emitter.emit_order_submitted(&order);
474
475        let builder_fee = HyperliquidExecBuilderFee {
476            address: NAUTILUS_BUILDER_FEE_ADDRESS.to_string(),
477            fee_tenths_bp: NAUTILUS_BUILDER_FEE_TENTHS_BP,
478        };
479
480        let emitter = self.emitter.clone();
481        let clock = self.clock;
482        let ws_client = self.ws_client.clone();
483        let cloid_hex = Ustr::from(&cloid.to_hex());
484
485        self.spawn_task("submit_order", async move {
486            let action = HyperliquidExecAction::Order {
487                orders: vec![hyperliquid_order],
488                grouping: HyperliquidExecGrouping::Na,
489                builder: Some(builder_fee),
490            };
491
492            match http_client.post_action_exec(&action).await {
493                Ok(response) => {
494                    if is_response_successful(&response) {
495                        log::info!("Order submitted successfully: {response:?}");
496                    } else {
497                        let error_msg = extract_error_message(&response);
498                        log::warn!("Order submission rejected by exchange: {error_msg}");
499                        let ts = clock.get_time_ns();
500                        emitter.emit_order_rejected(&order, &error_msg, ts, false);
501                        ws_client.remove_cloid_mapping(&cloid_hex);
502                    }
503                }
504                Err(e) => {
505                    // Don't reject on transport errors: the order may have
506                    // landed and WS events will drive the lifecycle. If it
507                    // didn't land, reconciliation on reconnect resolves it.
508                    log::error!("Order submission HTTP request failed: {e}");
509                }
510            }
511
512            Ok(())
513        });
514
515        Ok(())
516    }
517
518    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
519        log::debug!(
520            "Submitting order list with {} orders",
521            cmd.order_list.client_order_ids.len()
522        );
523
524        let http_client = self.http_client.clone();
525
526        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
527
528        // Validate all orders synchronously and collect valid ones
529        let mut valid_orders = Vec::new();
530        let mut hyperliquid_orders = Vec::new();
531
532        for order in &orders {
533            let symbol = order.instrument_id().symbol.to_string();
534            let asset = match http_client.get_asset_index(&symbol) {
535                Some(a) => a,
536                None => {
537                    self.emitter
538                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
539                    continue;
540                }
541            };
542
543            match order_to_hyperliquid_request_with_asset(order, asset) {
544                Ok(req) => {
545                    hyperliquid_orders.push(req);
546                    valid_orders.push(order.clone());
547                }
548                Err(e) => {
549                    self.emitter
550                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
551                }
552            }
553        }
554
555        if valid_orders.is_empty() {
556            log::warn!("No valid orders to submit in order list");
557            return Ok(());
558        }
559
560        for order in &valid_orders {
561            let cloid = Cloid::from_client_order_id(order.client_order_id());
562            self.ws_client
563                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
564            self.emitter.emit_order_submitted(order);
565        }
566
567        let builder_fee = HyperliquidExecBuilderFee {
568            address: NAUTILUS_BUILDER_FEE_ADDRESS.to_string(),
569            fee_tenths_bp: NAUTILUS_BUILDER_FEE_TENTHS_BP,
570        };
571
572        let emitter = self.emitter.clone();
573        let clock = self.clock;
574        let ws_client = self.ws_client.clone();
575        let cloid_hexes: Vec<Ustr> = valid_orders
576            .iter()
577            .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
578            .collect();
579
580        self.spawn_task("submit_order_list", async move {
581            let action = HyperliquidExecAction::Order {
582                orders: hyperliquid_orders,
583                grouping: HyperliquidExecGrouping::Na,
584                builder: Some(builder_fee),
585            };
586            match http_client.post_action_exec(&action).await {
587                Ok(response) => {
588                    if is_response_successful(&response) {
589                        log::info!("Order list submitted successfully: {response:?}");
590                    } else {
591                        // Hyperliquid batch endpoint rejects all-or-nothing
592                        let error_msg = extract_error_message(&response);
593                        log::warn!("Order list submission rejected by exchange: {error_msg}");
594                        let ts = clock.get_time_ns();
595                        for order in &valid_orders {
596                            emitter.emit_order_rejected(order, &error_msg, ts, false);
597                        }
598                        for cloid_hex in &cloid_hexes {
599                            ws_client.remove_cloid_mapping(cloid_hex);
600                        }
601                    }
602                }
603                Err(e) => {
604                    // Don't reject on transport errors: orders may have
605                    // landed and WS events will drive the lifecycle. If they
606                    // didn't land, reconciliation on reconnect resolves it.
607                    log::error!("Order list submission HTTP request failed: {e}");
608                }
609            }
610
611            Ok(())
612        });
613
614        Ok(())
615    }
616
617    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
618        log::debug!("Modifying order: {cmd:?}");
619
620        // Parse venue_order_id as u64
621        let venue_order_id = match cmd.venue_order_id {
622            Some(id) => id,
623            None => {
624                log::warn!("Cannot modify order: venue_order_id is None");
625                return Ok(());
626            }
627        };
628
629        let oid: u64 = match venue_order_id.as_str().parse() {
630            Ok(id) => id,
631            Err(e) => {
632                log::warn!("Failed to parse venue_order_id '{venue_order_id}' as u64: {e}");
633                return Ok(());
634            }
635        };
636
637        let http_client = self.http_client.clone();
638        let price = cmd.price;
639        let quantity = cmd.quantity;
640        let symbol = cmd.instrument_id.symbol.to_string();
641
642        self.spawn_task("modify_order", async move {
643            let asset = match http_client.get_asset_index(&symbol) {
644                Some(a) => a,
645                None => {
646                    log::warn!(
647                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
648                    );
649                    return Ok(());
650                }
651            };
652
653            // Build typed modify request with new price and/or quantity
654            let modify_request = HyperliquidExecModifyOrderRequest {
655                asset,
656                oid,
657                price: price.map(|p| (*p).into()),
658                size: quantity.map(|q| (*q).into()),
659                reduce_only: None,
660                kind: None,
661            };
662
663            let action = HyperliquidExecAction::Modify {
664                modify: modify_request,
665            };
666
667            match http_client.post_action_exec(&action).await {
668                Ok(response) => {
669                    if is_response_successful(&response) {
670                        log::info!("Order modified successfully: {response:?}");
671                        // Order update events will be generated from WebSocket updates
672                    } else {
673                        let error_msg = extract_error_message(&response);
674                        log::warn!("Order modification rejected by exchange: {error_msg}");
675                        // Order modify rejected events will be generated from WebSocket updates
676                    }
677                }
678                Err(e) => {
679                    log::warn!("Order modification HTTP request failed: {e}");
680                    // WebSocket reconciliation will handle recovery
681                }
682            }
683
684            Ok(())
685        });
686
687        Ok(())
688    }
689
690    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
691        log::debug!("Cancelling order: {cmd:?}");
692
693        let http_client = self.http_client.clone();
694        let client_order_id = cmd.client_order_id.to_string();
695        let symbol = cmd.instrument_id.symbol.to_string();
696
697        self.spawn_task("cancel_order", async move {
698            let asset = match http_client.get_asset_index(&symbol) {
699                Some(a) => a,
700                None => {
701                    log::warn!(
702                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
703                    );
704                    return Ok(());
705                }
706            };
707
708            let cancel_request =
709                client_order_id_to_cancel_request_with_asset(&client_order_id, asset);
710            let action = HyperliquidExecAction::CancelByCloid {
711                cancels: vec![cancel_request],
712            };
713
714            match http_client.post_action_exec(&action).await {
715                Ok(response) => {
716                    if is_response_successful(&response) {
717                        log::info!("Order cancelled successfully: {response:?}");
718                    } else {
719                        let error_msg = extract_error_message(&response);
720                        log::warn!("Order cancellation rejected by exchange: {error_msg}");
721                    }
722                }
723                Err(e) => {
724                    log::warn!("Order cancellation HTTP request failed: {e}");
725                }
726            }
727
728            Ok(())
729        });
730
731        Ok(())
732    }
733
734    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
735        log::debug!("Cancelling all orders: {cmd:?}");
736
737        let cache = self.core.cache();
738        let open_orders = cache.orders_open(
739            Some(&self.core.venue),
740            Some(&cmd.instrument_id),
741            None,
742            None,
743            Some(cmd.order_side),
744        );
745
746        if open_orders.is_empty() {
747            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
748            return Ok(());
749        }
750
751        let symbol = cmd.instrument_id.symbol.to_string();
752        let client_order_ids: Vec<String> = open_orders
753            .iter()
754            .map(|o| o.client_order_id().to_string())
755            .collect();
756
757        let http_client = self.http_client.clone();
758
759        self.spawn_task("cancel_all_orders", async move {
760            let asset = match http_client.get_asset_index(&symbol) {
761                Some(a) => a,
762                None => {
763                    log::warn!(
764                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
765                    );
766                    return Ok(());
767                }
768            };
769
770            let cancel_requests: Vec<_> = client_order_ids
771                .iter()
772                .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
773                .collect();
774
775            if cancel_requests.is_empty() {
776                log::debug!("No valid cancel requests to send");
777                return Ok(());
778            }
779
780            let action = HyperliquidExecAction::CancelByCloid {
781                cancels: cancel_requests,
782            };
783            if let Err(e) = http_client.post_action_exec(&action).await {
784                log::warn!("Failed to send cancel all orders request: {e}");
785            }
786
787            Ok(())
788        });
789
790        Ok(())
791    }
792
793    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
794        log::debug!("Batch cancelling orders: {cmd:?}");
795
796        if cmd.cancels.is_empty() {
797            log::debug!("No orders to cancel in batch");
798            return Ok(());
799        }
800
801        let cancel_info: Vec<(String, String)> = cmd
802            .cancels
803            .iter()
804            .map(|c| {
805                (
806                    c.client_order_id.to_string(),
807                    c.instrument_id.symbol.to_string(),
808                )
809            })
810            .collect();
811
812        let http_client = self.http_client.clone();
813
814        self.spawn_task("batch_cancel_orders", async move {
815            let mut cancel_requests = Vec::new();
816
817            for (client_order_id, symbol) in &cancel_info {
818                let asset = match http_client.get_asset_index(symbol) {
819                    Some(a) => a,
820                    None => {
821                        log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
822                        continue;
823                    }
824                };
825                cancel_requests.push(client_order_id_to_cancel_request_with_asset(
826                    client_order_id,
827                    asset,
828                ));
829            }
830
831            if cancel_requests.is_empty() {
832                log::warn!("No valid cancel requests in batch");
833                return Ok(());
834            }
835
836            let action = HyperliquidExecAction::CancelByCloid {
837                cancels: cancel_requests,
838            };
839            if let Err(e) = http_client.post_action_exec(&action).await {
840                log::warn!("Failed to send batch cancel orders request: {e}");
841            }
842
843            Ok(())
844        });
845
846        Ok(())
847    }
848
849    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
850        log::debug!("Querying account: {cmd:?}");
851
852        // Use existing infrastructure to refresh account state
853        let runtime = get_runtime();
854        runtime.block_on(async {
855            if let Err(e) = self.refresh_account_state().await {
856                log::warn!("Failed to query account state: {e}");
857            }
858        });
859
860        Ok(())
861    }
862
863    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
864        log::debug!("Querying order: {cmd:?}");
865
866        // Get venue order ID from cache
867        let cache = self.core.cache();
868        let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
869
870        let venue_order_id = match venue_order_id {
871            Some(oid) => *oid,
872            None => {
873                log::warn!(
874                    "No venue order ID found for client order {}",
875                    cmd.client_order_id
876                );
877                return Ok(());
878            }
879        };
880        drop(cache);
881
882        // Parse venue order ID to u64
883        let oid = match u64::from_str(venue_order_id.as_ref()) {
884            Ok(id) => id,
885            Err(e) => {
886                log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
887                return Ok(());
888            }
889        };
890
891        let account_address = self.get_account_address()?;
892
893        // Query order status via HTTP API
894        // Note: The WebSocket connection is the authoritative source for order updates,
895        // this is primarily for reconciliation or when WebSocket is unavailable
896        let http_client = self.http_client.clone();
897        let runtime = get_runtime();
898        runtime.spawn(async move {
899            match http_client.info_order_status(&account_address, oid).await {
900                Ok(status) => {
901                    log::debug!("Order status for oid {oid}: {status:?}");
902                }
903                Err(e) => {
904                    log::warn!("Failed to query order status for oid {oid}: {e}");
905                }
906            }
907        });
908
909        Ok(())
910    }
911
912    async fn connect(&mut self) -> anyhow::Result<()> {
913        if self.core.is_connected() {
914            return Ok(());
915        }
916
917        log::info!("Connecting Hyperliquid execution client");
918
919        // Ensure instruments are initialized
920        self.ensure_instruments_initialized_async().await?;
921
922        // Start WebSocket stream (connects and subscribes to user channels)
923        self.start_ws_stream().await?;
924
925        // Initialize account state and wait for it to be registered in cache
926        self.refresh_account_state().await?;
927        self.await_account_registered(30.0).await?;
928
929        self.core.set_connected();
930
931        log::info!("Connected: client_id={}", self.core.client_id);
932        Ok(())
933    }
934
935    async fn disconnect(&mut self) -> anyhow::Result<()> {
936        if self.core.is_disconnected() {
937            return Ok(());
938        }
939
940        log::info!("Disconnecting Hyperliquid execution client");
941
942        // Disconnect WebSocket
943        self.ws_client.disconnect().await?;
944
945        // Abort any pending tasks
946        self.abort_pending_tasks();
947
948        self.core.set_disconnected();
949
950        log::info!("Disconnected: client_id={}", self.core.client_id);
951        Ok(())
952    }
953
954    async fn generate_order_status_report(
955        &self,
956        _cmd: &GenerateOrderStatusReport,
957    ) -> anyhow::Result<Option<OrderStatusReport>> {
958        // NOTE: Single order status report generation requires instrument cache integration.
959        // The HTTP client methods and parsing functions are implemented and ready to use.
960        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
961        log::warn!("generate_order_status_report not yet fully implemented");
962        Ok(None)
963    }
964
965    async fn generate_order_status_reports(
966        &self,
967        cmd: &GenerateOrderStatusReports,
968    ) -> anyhow::Result<Vec<OrderStatusReport>> {
969        let account_address = self.get_account_address()?;
970
971        let reports = self
972            .http_client
973            .request_order_status_reports(&account_address, cmd.instrument_id)
974            .await
975            .context("failed to generate order status reports")?;
976
977        // Filter by open_only if specified
978        let reports = if cmd.open_only {
979            reports
980                .into_iter()
981                .filter(|r| r.order_status.is_open())
982                .collect()
983        } else {
984            reports
985        };
986
987        // Filter by time range if specified
988        let reports = match (cmd.start, cmd.end) {
989            (Some(start), Some(end)) => reports
990                .into_iter()
991                .filter(|r| r.ts_last >= start && r.ts_last <= end)
992                .collect(),
993            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
994            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
995            (None, None) => reports,
996        };
997
998        log::info!("Generated {} order status reports", reports.len());
999        Ok(reports)
1000    }
1001
1002    async fn generate_fill_reports(
1003        &self,
1004        cmd: GenerateFillReports,
1005    ) -> anyhow::Result<Vec<FillReport>> {
1006        let account_address = self.get_account_address()?;
1007
1008        let reports = self
1009            .http_client
1010            .request_fill_reports(&account_address, cmd.instrument_id)
1011            .await
1012            .context("failed to generate fill reports")?;
1013
1014        // Filter by time range if specified
1015        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1016            reports
1017                .into_iter()
1018                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1019                .collect()
1020        } else if let Some(start) = cmd.start {
1021            reports
1022                .into_iter()
1023                .filter(|r| r.ts_event >= start)
1024                .collect()
1025        } else if let Some(end) = cmd.end {
1026            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1027        } else {
1028            reports
1029        };
1030
1031        log::info!("Generated {} fill reports", reports.len());
1032        Ok(reports)
1033    }
1034
1035    async fn generate_position_status_reports(
1036        &self,
1037        cmd: &GeneratePositionStatusReports,
1038    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1039        let account_address = self.get_account_address()?;
1040
1041        let reports = self
1042            .http_client
1043            .request_position_status_reports(&account_address, cmd.instrument_id)
1044            .await
1045            .context("failed to generate position status reports")?;
1046
1047        log::info!("Generated {} position status reports", reports.len());
1048        Ok(reports)
1049    }
1050
1051    async fn generate_mass_status(
1052        &self,
1053        lookback_mins: Option<u64>,
1054    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1055        let ts_init = self.clock.get_time_ns();
1056
1057        let order_cmd = GenerateOrderStatusReports::new(
1058            UUID4::new(),
1059            ts_init,
1060            true, // open_only
1061            None,
1062            None,
1063            None,
1064            None,
1065            None,
1066        );
1067        let fill_cmd =
1068            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1069        let position_cmd =
1070            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1071
1072        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1073        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1074        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1075
1076        // Apply lookback filter to fills only (positions are current state,
1077        // and open orders must always be included for correct reconciliation)
1078        if let Some(mins) = lookback_mins {
1079            let cutoff_ns = ts_init
1080                .as_u64()
1081                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1082            let cutoff = UnixNanos::from(cutoff_ns);
1083
1084            fill_reports.retain(|r| r.ts_event >= cutoff);
1085        }
1086
1087        let mut mass_status = ExecutionMassStatus::new(
1088            self.core.client_id,
1089            self.core.account_id,
1090            self.core.venue,
1091            ts_init,
1092            None,
1093        );
1094        mass_status.add_order_reports(order_reports);
1095        mass_status.add_fill_reports(fill_reports);
1096        mass_status.add_position_reports(position_reports);
1097
1098        log::info!(
1099            "Generated mass status: {} orders, {} fills, {} positions",
1100            mass_status.order_reports().len(),
1101            mass_status.fill_reports().len(),
1102            mass_status.position_reports().len(),
1103        );
1104
1105        Ok(Some(mass_status))
1106    }
1107}
1108
1109impl HyperliquidExecutionClient {
1110    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1111        {
1112            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1113            if handle_guard.is_some() {
1114                return Ok(());
1115            }
1116        }
1117
1118        let user_address = self.get_user_address()?;
1119
1120        // Use vault address for WS subscriptions when vault trading,
1121        // otherwise order/fill updates for the vault will be missed
1122        let subscription_address = self
1123            .config
1124            .vault_address
1125            .as_ref()
1126            .unwrap_or(&user_address)
1127            .clone();
1128
1129        let mut ws_client = self.ws_client.clone();
1130
1131        let instruments = self
1132            .http_client
1133            .request_instruments()
1134            .await
1135            .unwrap_or_default();
1136
1137        for instrument in instruments {
1138            ws_client.cache_instrument(instrument);
1139        }
1140
1141        // Connect and subscribe before spawning the event loop
1142        ws_client.connect().await?;
1143        ws_client
1144            .subscribe_order_updates(&subscription_address)
1145            .await?;
1146        ws_client
1147            .subscribe_user_events(&subscription_address)
1148            .await?;
1149        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1150
1151        let runtime = get_runtime();
1152        let handle = runtime.spawn(async move {
1153            // Orders with FILLED status awaiting their final fill
1154            let mut pending_filled: AHashSet<ClientOrderId> = AHashSet::new();
1155
1156            loop {
1157                let event = ws_client.next_event().await;
1158
1159                match event {
1160                    Some(msg) => {
1161                        match msg {
1162                            NautilusWsMessage::ExecutionReports(reports) => {
1163                                let mut terminal_ids: Vec<ClientOrderId> = Vec::new();
1164                                let mut filled_ids: Vec<ClientOrderId> = Vec::new();
1165                                let mut fill_ids: Vec<ClientOrderId> = Vec::new();
1166
1167                                for report in &reports {
1168                                    match report {
1169                                        ExecutionReport::Order(order_report) => {
1170                                            if let Some(id) = order_report.client_order_id
1171                                                && !order_report.order_status.is_open()
1172                                            {
1173                                                if order_report.order_status
1174                                                    == OrderStatus::Filled
1175                                                {
1176                                                    filled_ids.push(id);
1177                                                } else {
1178                                                    terminal_ids.push(id);
1179                                                }
1180                                            }
1181                                        }
1182                                        ExecutionReport::Fill(fill_report) => {
1183                                            if let Some(id) = fill_report.client_order_id {
1184                                                fill_ids.push(id);
1185                                            }
1186                                        }
1187                                    }
1188                                }
1189
1190                                for report in reports {
1191                                    dispatch_execution_report(report);
1192                                }
1193
1194                                for id in terminal_ids {
1195                                    let cloid = Cloid::from_client_order_id(id);
1196                                    ws_client.remove_cloid_mapping(&Ustr::from(
1197                                        &cloid.to_hex(),
1198                                    ));
1199                                }
1200
1201                                // Track FILLED status for deferred cleanup
1202                                for id in filled_ids {
1203                                    pending_filled.insert(id);
1204                                }
1205
1206                                // Clean up only after FILLED has been observed
1207                                for id in fill_ids {
1208                                    if pending_filled.remove(&id) {
1209                                        let cloid = Cloid::from_client_order_id(id);
1210                                        ws_client.remove_cloid_mapping(&Ustr::from(
1211                                            &cloid.to_hex(),
1212                                        ));
1213                                    }
1214                                }
1215                            }
1216                            NautilusWsMessage::Reconnected => {
1217                                log::info!("WebSocket reconnected, resubscribing to user channels");
1218
1219                                if let Err(e) = ws_client
1220                                    .subscribe_order_updates(&subscription_address)
1221                                    .await
1222                                {
1223                                    log::error!(
1224                                        "Failed to resubscribe to order updates after reconnect: {e}"
1225                                    );
1226                                }
1227
1228                                if let Err(e) = ws_client
1229                                    .subscribe_user_events(&subscription_address)
1230                                    .await
1231                                {
1232                                    log::error!(
1233                                        "Failed to resubscribe to user events after reconnect: {e}"
1234                                    );
1235                                }
1236
1237                                log::info!("Resubscribed to execution channels");
1238                            }
1239                            NautilusWsMessage::Error(e) => {
1240                                log::error!("WebSocket error: {e}");
1241                            }
1242                            // Handled by data client
1243                            NautilusWsMessage::Trades(_)
1244                            | NautilusWsMessage::Quote(_)
1245                            | NautilusWsMessage::Deltas(_)
1246                            | NautilusWsMessage::Candle(_)
1247                            | NautilusWsMessage::MarkPrice(_)
1248                            | NautilusWsMessage::IndexPrice(_)
1249                            | NautilusWsMessage::FundingRate(_) => {}
1250                        }
1251                    }
1252                    None => {
1253                        log::warn!("WebSocket next_event returned None");
1254                        break;
1255                    }
1256                }
1257            }
1258        });
1259
1260        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1261        log::info!("Hyperliquid WebSocket execution stream started");
1262        Ok(())
1263    }
1264}
1265
1266fn dispatch_execution_report(report: ExecutionReport) {
1267    let sender = get_exec_event_sender();
1268    match report {
1269        ExecutionReport::Order(order_report) => {
1270            let exec_report = NautilusExecutionReport::Order(Box::new(order_report));
1271            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1272                log::warn!("Failed to send order status report: {e}");
1273            }
1274        }
1275        ExecutionReport::Fill(fill_report) => {
1276            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1277            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1278                log::warn!("Failed to send fill report: {e}");
1279            }
1280        }
1281    }
1282}