nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{cell::Ref, str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use nautilus_common::{
22    clock::Clock,
23    messages::{
24        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
25        execution::{
26            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
27            SubmitOrder, SubmitOrderList,
28        },
29    },
30    runner::get_exec_event_sender,
31    runtime::get_runtime,
32};
33use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
34use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
35use nautilus_live::execution::LiveExecutionClientExt;
36use nautilus_model::{
37    accounts::AccountAny,
38    enums::{OmsType, OrderType},
39    identifiers::{AccountId, ClientId, Venue},
40    orders::{Order, any::OrderAny},
41    types::{AccountBalance, MarginBalance},
42};
43use serde_json;
44use tokio::task::JoinHandle;
45
46use crate::{
47    common::{
48        consts::HYPERLIQUID_VENUE,
49        credential::Secrets,
50        parse::{
51            client_order_id_to_cancel_request, extract_error_message, is_response_successful,
52            order_any_to_hyperliquid_request, orders_to_hyperliquid_requests,
53        },
54    },
55    config::HyperliquidExecClientConfig,
56    http::{client::HyperliquidHttpClient, query::ExchangeAction},
57    websocket::{
58        ExecutionReport,
59        client::HyperliquidWebSocketClient,
60        messages::HyperliquidWsMessage as HyperliquidWsMsg,
61        parse::{parse_ws_fill_report, parse_ws_order_status_report},
62    },
63};
64
65#[derive(Debug)]
66pub struct HyperliquidExecutionClient {
67    core: ExecutionClientCore,
68    config: HyperliquidExecClientConfig,
69    http_client: HyperliquidHttpClient,
70    ws_client: HyperliquidWebSocketClient,
71    started: bool,
72    connected: bool,
73    instruments_initialized: bool,
74    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
76}
77
78impl HyperliquidExecutionClient {
79    /// Returns a reference to the configuration.
80    pub fn config(&self) -> &HyperliquidExecClientConfig {
81        &self.config
82    }
83
84    /// Validates order before submission to catch issues early.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the order cannot be submitted to Hyperliquid.
89    ///
90    /// # Supported Order Types
91    ///
92    /// - `Market`: Standard market orders
93    /// - `Limit`: Limit orders with GTC/IOC/ALO time-in-force
94    /// - `StopMarket`: Stop loss / protective stop with market execution
95    /// - `StopLimit`: Stop loss / protective stop with limit price
96    /// - `MarketIfTouched`: Profit taking / entry order with market execution
97    /// - `LimitIfTouched`: Profit taking / entry order with limit price
98    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
99        // Check if instrument symbol is supported
100        let symbol = order.instrument_id().symbol.to_string();
101        if !symbol.ends_with("-USD") {
102            anyhow::bail!("Unsupported instrument symbol format for Hyperliquid: {symbol}");
103        }
104
105        // Check if order type is supported
106        match order.order_type() {
107            OrderType::Market
108            | OrderType::Limit
109            | OrderType::StopMarket
110            | OrderType::StopLimit
111            | OrderType::MarketIfTouched
112            | OrderType::LimitIfTouched => {}
113            _ => anyhow::bail!(
114                "Unsupported order type for Hyperliquid: {:?}",
115                order.order_type()
116            ),
117        }
118
119        // Check if conditional orders have trigger price
120        if matches!(
121            order.order_type(),
122            OrderType::StopMarket
123                | OrderType::StopLimit
124                | OrderType::MarketIfTouched
125                | OrderType::LimitIfTouched
126        ) && order.trigger_price().is_none()
127        {
128            anyhow::bail!(
129                "Conditional orders require a trigger price for Hyperliquid: {:?}",
130                order.order_type()
131            );
132        }
133
134        // Check if limit-based orders have price
135        if matches!(
136            order.order_type(),
137            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
138        ) && order.price().is_none()
139        {
140            anyhow::bail!(
141                "Limit orders require a limit price for Hyperliquid: {:?}",
142                order.order_type()
143            );
144        }
145
146        Ok(())
147    }
148
149    /// Creates a new [`HyperliquidExecutionClient`].
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if either the HTTP or WebSocket client fail to construct.
154    pub fn new(
155        core: ExecutionClientCore,
156        config: HyperliquidExecClientConfig,
157    ) -> anyhow::Result<Self> {
158        if !config.has_credentials() {
159            anyhow::bail!("Hyperliquid execution client requires private key");
160        }
161
162        let secrets = Secrets::from_json(&format!(
163            r#"{{"privateKey": "{}", "isTestnet": {}}}"#,
164            config.private_key, config.is_testnet
165        ))
166        .context("failed to create secrets from private key")?;
167
168        let http_client =
169            HyperliquidHttpClient::with_credentials(&secrets, Some(config.http_timeout_secs));
170
171        // Create WebSocket client (will connect when needed)
172        let ws_client = HyperliquidWebSocketClient::new(
173            crate::common::consts::ws_url(config.is_testnet).to_string(),
174        );
175
176        Ok(Self {
177            core,
178            config,
179            http_client,
180            ws_client,
181            started: false,
182            connected: false,
183            instruments_initialized: false,
184            pending_tasks: Mutex::new(Vec::new()),
185            ws_stream_handle: Mutex::new(None),
186        })
187    }
188
189    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
190        if self.instruments_initialized {
191            return Ok(());
192        }
193
194        let instruments = self
195            .http_client
196            .request_instruments()
197            .await
198            .context("failed to request Hyperliquid instruments")?;
199
200        if instruments.is_empty() {
201            tracing::warn!(
202                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
203            );
204        } else {
205            tracing::info!("Initialized {} instruments", instruments.len());
206        }
207
208        self.instruments_initialized = true;
209        Ok(())
210    }
211
212    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
213        if self.instruments_initialized {
214            return Ok(());
215        }
216
217        let runtime = get_runtime();
218        runtime.block_on(self.ensure_instruments_initialized_async())
219    }
220
221    async fn refresh_account_state(&self) -> anyhow::Result<()> {
222        // Get account information from Hyperliquid using the user address
223        // We need to derive the user address from the private key in the config
224        let user_address = self.get_user_address()?;
225
226        // Use vault address if configured, otherwise use user address
227        let account_address = self.config.vault_address.as_ref().unwrap_or(&user_address);
228
229        // Query clearinghouseState endpoint to get balances and margin info
230        let clearinghouse_state = self
231            .http_client
232            .info_clearinghouse_state(account_address)
233            .await
234            .context("Failed to fetch clearinghouse state")?;
235
236        // Deserialize the response
237        let state: crate::http::models::ClearinghouseState =
238            serde_json::from_value(clearinghouse_state)
239                .context("Failed to deserialize clearinghouse state")?;
240
241        tracing::debug!(
242            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
243            state.cross_margin_summary,
244            state.asset_positions.len()
245        );
246
247        // Parse balances and margins from cross margin summary
248        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
249            let (balances, margins) =
250                crate::common::parse::parse_account_balances_and_margins(cross_margin_summary)
251                    .context("Failed to parse account balances and margins")?;
252
253            let ts_event = if let Some(time_ms) = state.time {
254                nautilus_core::UnixNanos::from(time_ms * 1_000_000)
255            } else {
256                nautilus_core::time::get_atomic_clock_realtime().get_time_ns()
257            };
258
259            // Generate account state event
260            self.core.generate_account_state(
261                balances, margins, true, // reported
262                ts_event,
263            )?;
264
265            tracing::info!("Account state updated successfully");
266        } else {
267            tracing::warn!("No cross margin summary in clearinghouse state");
268        }
269
270        Ok(())
271    }
272
273    fn get_user_address(&self) -> anyhow::Result<String> {
274        // Use the HTTP client's get_user_address() method which properly derives
275        // the address from the private key using the signer's address() method
276        let address = self
277            .http_client
278            .get_user_address()
279            .context("Failed to get user address from HTTP client")?;
280
281        Ok(address)
282    }
283
284    fn spawn_task<F>(&self, description: &'static str, fut: F)
285    where
286        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
287    {
288        let runtime = get_runtime();
289        let handle = runtime.spawn(async move {
290            if let Err(e) = fut.await {
291                tracing::warn!("{description} failed: {e:?}");
292            }
293        });
294
295        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
296        tasks.retain(|handle| !handle.is_finished());
297        tasks.push(handle);
298    }
299
300    fn abort_pending_tasks(&self) {
301        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
302        for handle in tasks.drain(..) {
303            handle.abort();
304        }
305    }
306
307    fn update_account_state(&self) -> anyhow::Result<()> {
308        let runtime = get_runtime();
309        runtime.block_on(self.refresh_account_state())
310    }
311}
312
313impl ExecutionClient for HyperliquidExecutionClient {
314    fn is_connected(&self) -> bool {
315        self.connected
316    }
317
318    fn client_id(&self) -> ClientId {
319        self.core.client_id
320    }
321
322    fn account_id(&self) -> AccountId {
323        self.core.account_id
324    }
325
326    fn venue(&self) -> Venue {
327        *HYPERLIQUID_VENUE
328    }
329
330    fn oms_type(&self) -> OmsType {
331        self.core.oms_type
332    }
333
334    fn get_account(&self) -> Option<AccountAny> {
335        self.core.get_account()
336    }
337
338    fn generate_account_state(
339        &self,
340        balances: Vec<AccountBalance>,
341        margins: Vec<MarginBalance>,
342        reported: bool,
343        ts_event: UnixNanos,
344    ) -> anyhow::Result<()> {
345        self.core
346            .generate_account_state(balances, margins, reported, ts_event)
347    }
348
349    fn start(&mut self) -> anyhow::Result<()> {
350        if self.started {
351            return Ok(());
352        }
353
354        tracing::info!("Starting Hyperliquid execution client");
355
356        // Ensure instruments are initialized
357        self.ensure_instruments_initialized()?;
358
359        // Initialize account state
360        if let Err(e) = self.update_account_state() {
361            tracing::warn!("Failed to initialize account state: {}", e);
362        }
363
364        self.connected = true;
365        self.started = true;
366
367        // Start WebSocket stream for execution updates
368        if let Err(e) = self.start_ws_stream() {
369            tracing::warn!("Failed to start WebSocket stream: {}", e);
370        }
371
372        tracing::info!("Hyperliquid execution client started");
373        Ok(())
374    }
375    fn stop(&mut self) -> anyhow::Result<()> {
376        if !self.started {
377            return Ok(());
378        }
379
380        tracing::info!("Stopping Hyperliquid execution client");
381
382        // Stop WebSocket stream
383        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
384            handle.abort();
385        }
386
387        // Abort any pending tasks
388        self.abort_pending_tasks();
389
390        // Disconnect WebSocket
391        if self.connected {
392            let runtime = get_runtime();
393            runtime.block_on(async {
394                if let Err(e) = self.ws_client.disconnect().await {
395                    tracing::warn!("Error disconnecting WebSocket client: {e}");
396                }
397            });
398        }
399
400        self.connected = false;
401        self.started = false;
402
403        tracing::info!("Hyperliquid execution client stopped");
404        Ok(())
405    }
406
407    fn submit_order(&self, command: &SubmitOrder) -> anyhow::Result<()> {
408        let order = &command.order;
409
410        if order.is_closed() {
411            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
412            return Ok(());
413        }
414
415        // Validate order before submission
416        if let Err(e) = self.validate_order_submission(order) {
417            self.core.generate_order_rejected(
418                order.strategy_id(),
419                order.instrument_id(),
420                order.client_order_id(),
421                &format!("validation-error: {e}"),
422                command.ts_init,
423                false,
424            );
425            return Err(e);
426        }
427
428        self.core.generate_order_submitted(
429            order.strategy_id(),
430            order.instrument_id(),
431            order.client_order_id(),
432            command.ts_init,
433        );
434
435        let http_client = self.http_client.clone();
436        let order_clone = order.clone();
437
438        self.spawn_task("submit_order", async move {
439            match order_any_to_hyperliquid_request(&order_clone) {
440                Ok(hyperliquid_order) => {
441                    // Create exchange action for order placement with typed struct
442                    let action = ExchangeAction::order(vec![hyperliquid_order]);
443
444                    match http_client.post_action(&action).await {
445                        Ok(response) => {
446                            if is_response_successful(&response) {
447                                tracing::info!("Order submitted successfully: {:?}", response);
448                                // Order acceptance/rejection events will be generated from WebSocket updates
449                                // which provide the venue_order_id and definitive status
450                            } else {
451                                let error_msg = extract_error_message(&response);
452                                tracing::warn!(
453                                    "Order submission rejected by exchange: {}",
454                                    error_msg
455                                );
456                                // Order rejection event will be generated from WebSocket updates
457                            }
458                        }
459                        Err(e) => {
460                            tracing::warn!("Order submission HTTP request failed: {e}");
461                            // WebSocket reconnection and order reconciliation will handle recovery
462                        }
463                    }
464                }
465                Err(e) => {
466                    tracing::warn!("Failed to convert order to Hyperliquid format: {e}");
467                    // This indicates a client-side bug or unsupported order configuration
468                }
469            }
470
471            Ok(())
472        });
473
474        Ok(())
475    }
476
477    fn submit_order_list(&self, command: &SubmitOrderList) -> anyhow::Result<()> {
478        tracing::debug!(
479            "Submitting order list with {} orders",
480            command.order_list.orders.len()
481        );
482
483        let http_client = self.http_client.clone();
484        let orders: Vec<OrderAny> = command.order_list.orders.clone();
485
486        // Generate submitted events for all orders
487        for order in &orders {
488            self.core.generate_order_submitted(
489                order.strategy_id(),
490                order.instrument_id(),
491                order.client_order_id(),
492                command.ts_init,
493            );
494        }
495
496        self.spawn_task("submit_order_list", async move {
497            // Convert all orders to Hyperliquid format
498            let order_refs: Vec<&OrderAny> = orders.iter().collect();
499            match orders_to_hyperliquid_requests(&order_refs) {
500                Ok(hyperliquid_orders) => {
501                    // Create exchange action for order placement with typed struct
502                    let action = ExchangeAction::order(hyperliquid_orders);
503                    match http_client.post_action(&action).await {
504                        Ok(response) => {
505                            if is_response_successful(&response) {
506                                tracing::info!("Order list submitted successfully: {:?}", response);
507                                // Order acceptance/rejection events will be generated from WebSocket updates
508                            } else {
509                                let error_msg = extract_error_message(&response);
510                                tracing::warn!(
511                                    "Order list submission rejected by exchange: {}",
512                                    error_msg
513                                );
514                                // Individual order rejection events will be generated from WebSocket updates
515                            }
516                        }
517                        Err(e) => {
518                            tracing::warn!("Order list submission HTTP request failed: {e}");
519                            // WebSocket reconciliation will handle recovery
520                        }
521                    }
522                }
523                Err(e) => {
524                    tracing::warn!("Failed to convert order list to Hyperliquid format: {e}");
525                }
526            }
527
528            Ok(())
529        });
530
531        Ok(())
532    }
533
534    fn modify_order(&self, command: &ModifyOrder) -> anyhow::Result<()> {
535        tracing::debug!("Modifying order: {:?}", command);
536
537        // Parse venue_order_id as u64
538        let oid: u64 = match command.venue_order_id.as_str().parse() {
539            Ok(id) => id,
540            Err(e) => {
541                tracing::warn!(
542                    "Failed to parse venue_order_id '{}' as u64: {}",
543                    command.venue_order_id,
544                    e
545                );
546                return Ok(());
547            }
548        };
549
550        let http_client = self.http_client.clone();
551        let price = command.price;
552        let quantity = command.quantity;
553        let symbol = command.instrument_id.symbol.to_string();
554
555        self.spawn_task("modify_order", async move {
556            use crate::{
557                common::parse::extract_asset_id_from_symbol,
558                http::models::HyperliquidExecModifyOrderRequest,
559            };
560
561            // Extract asset ID from instrument symbol
562            let asset = match extract_asset_id_from_symbol(&symbol) {
563                Ok(asset) => asset,
564                Err(e) => {
565                    tracing::warn!("Failed to extract asset ID from symbol {}: {}", symbol, e);
566                    return Ok(());
567                }
568            };
569
570            // Build typed modify request with new price and/or quantity
571            let modify_request = HyperliquidExecModifyOrderRequest {
572                asset,
573                oid,
574                price: price.map(|p| (*p).into()),
575                size: quantity.map(|q| (*q).into()),
576                reduce_only: None,
577                kind: None,
578            };
579
580            let action = ExchangeAction::modify(oid, modify_request);
581
582            match http_client.post_action(&action).await {
583                Ok(response) => {
584                    if is_response_successful(&response) {
585                        tracing::info!("Order modified successfully: {:?}", response);
586                        // Order update events will be generated from WebSocket updates
587                    } else {
588                        let error_msg = extract_error_message(&response);
589                        tracing::warn!("Order modification rejected by exchange: {}", error_msg);
590                        // Order modify rejected events will be generated from WebSocket updates
591                    }
592                }
593                Err(e) => {
594                    tracing::warn!("Order modification HTTP request failed: {e}");
595                    // WebSocket reconciliation will handle recovery
596                }
597            }
598
599            Ok(())
600        });
601
602        Ok(())
603    }
604
605    fn cancel_order(&self, command: &CancelOrder) -> anyhow::Result<()> {
606        tracing::debug!("Cancelling order: {:?}", command);
607
608        let http_client = self.http_client.clone();
609        let client_order_id = command.client_order_id.to_string();
610        let symbol = command.instrument_id.symbol.to_string();
611
612        self.spawn_task("cancel_order", async move {
613            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
614                Ok(cancel_request) => {
615                    // Create exchange action for order cancellation with typed struct
616                    let action = ExchangeAction::cancel_by_cloid(vec![cancel_request]);
617                    match http_client.post_action(&action).await {
618                        Ok(response) => {
619                            if is_response_successful(&response) {
620                                tracing::info!("Order cancelled successfully: {:?}", response);
621                                // Order cancelled events will be generated from WebSocket updates
622                                // which provide definitive confirmation and venue_order_id
623                            } else {
624                                let error_msg = extract_error_message(&response);
625                                tracing::warn!(
626                                    "Order cancellation rejected by exchange: {}",
627                                    error_msg
628                                );
629                                // Order cancel rejected events will be generated from WebSocket updates
630                            }
631                        }
632                        Err(e) => {
633                            tracing::warn!("Order cancellation HTTP request failed: {e}");
634                            // WebSocket reconnection and reconciliation will handle recovery
635                        }
636                    }
637                }
638                Err(e) => {
639                    tracing::warn!(
640                        "Failed to convert order to Hyperliquid cancel format: {:?}",
641                        e
642                    );
643                }
644            }
645
646            Ok(())
647        });
648
649        Ok(())
650    }
651
652    fn cancel_all_orders(&self, command: &CancelAllOrders) -> anyhow::Result<()> {
653        tracing::debug!("Cancelling all orders: {:?}", command);
654
655        // Query cache for all open orders matching the instrument and side
656        let cache = self.core.cache().borrow();
657        let open_orders = cache.orders_open(
658            Some(&self.core.venue),
659            Some(&command.instrument_id),
660            None,
661            Some(command.order_side),
662        );
663
664        if open_orders.is_empty() {
665            tracing::debug!("No open orders to cancel for {:?}", command.instrument_id);
666            return Ok(());
667        }
668
669        // Convert orders to cancel requests
670        let mut cancel_requests = Vec::new();
671        for order in open_orders {
672            let client_order_id = order.client_order_id().to_string();
673            let symbol = command.instrument_id.symbol.to_string();
674
675            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
676                Ok(req) => cancel_requests.push(req),
677                Err(e) => {
678                    tracing::warn!(
679                        "Failed to convert order {} to cancel request: {}",
680                        client_order_id,
681                        e
682                    );
683                    continue;
684                }
685            }
686        }
687
688        if cancel_requests.is_empty() {
689            tracing::debug!("No valid cancel requests to send");
690            return Ok(());
691        }
692
693        // Create exchange action for cancellation with typed struct
694        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
695
696        // Send cancel request via HTTP API
697        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
698        let http_client = self.http_client.clone();
699        let runtime = get_runtime();
700        runtime.spawn(async move {
701            if let Err(e) = http_client.post_action(&action).await {
702                tracing::warn!("Failed to send cancel all orders request: {}", e);
703            }
704        });
705
706        Ok(())
707    }
708
709    fn batch_cancel_orders(&self, command: &BatchCancelOrders) -> anyhow::Result<()> {
710        tracing::debug!("Batch cancelling orders: {:?}", command);
711
712        if command.cancels.is_empty() {
713            tracing::debug!("No orders to cancel in batch");
714            return Ok(());
715        }
716
717        // Convert each CancelOrder to a cancel request
718        let mut cancel_requests = Vec::new();
719        for cancel_cmd in &command.cancels {
720            let client_order_id = cancel_cmd.client_order_id.to_string();
721            let symbol = cancel_cmd.instrument_id.symbol.to_string();
722
723            match client_order_id_to_cancel_request(&client_order_id, &symbol) {
724                Ok(req) => cancel_requests.push(req),
725                Err(e) => {
726                    tracing::warn!(
727                        "Failed to convert order {} to cancel request: {}",
728                        client_order_id,
729                        e
730                    );
731                    continue;
732                }
733            }
734        }
735
736        if cancel_requests.is_empty() {
737            tracing::warn!("No valid cancel requests in batch");
738            return Ok(());
739        }
740
741        let action = ExchangeAction::cancel_by_cloid(cancel_requests);
742
743        // Send batch cancel request via HTTP API
744        // Note: The WebSocket connection will authoritatively handle the OrderCancelled events
745        let http_client = self.http_client.clone();
746        let runtime = get_runtime();
747        runtime.spawn(async move {
748            if let Err(e) = http_client.post_action(&action).await {
749                tracing::warn!("Failed to send batch cancel orders request: {}", e);
750            }
751        });
752
753        Ok(())
754    }
755
756    fn query_account(&self, command: &QueryAccount) -> anyhow::Result<()> {
757        tracing::debug!("Querying account: {:?}", command);
758
759        // Use existing infrastructure to refresh account state
760        let runtime = get_runtime();
761        runtime.block_on(async {
762            if let Err(e) = self.refresh_account_state().await {
763                tracing::warn!("Failed to query account state: {}", e);
764            }
765        });
766
767        Ok(())
768    }
769
770    fn query_order(&self, command: &QueryOrder) -> anyhow::Result<()> {
771        tracing::debug!("Querying order: {:?}", command);
772
773        // Get venue order ID from cache
774        let cache = self.core.cache().borrow();
775        let venue_order_id = cache.venue_order_id(&command.client_order_id);
776
777        let venue_order_id = match venue_order_id {
778            Some(oid) => *oid,
779            None => {
780                tracing::warn!(
781                    "No venue order ID found for client order {}",
782                    command.client_order_id
783                );
784                return Ok(());
785            }
786        };
787        drop(cache);
788
789        // Parse venue order ID to u64
790        let oid = match u64::from_str(venue_order_id.as_ref()) {
791            Ok(id) => id,
792            Err(e) => {
793                tracing::warn!("Failed to parse venue order ID {}: {}", venue_order_id, e);
794                return Ok(());
795            }
796        };
797
798        // Get user address for the query
799        let user_address = self.get_user_address()?;
800
801        // Query order status via HTTP API
802        // Note: The WebSocket connection is the authoritative source for order updates,
803        // this is primarily for reconciliation or when WebSocket is unavailable
804        let http_client = self.http_client.clone();
805        let runtime = get_runtime();
806        runtime.spawn(async move {
807            match http_client.info_order_status(&user_address, oid).await {
808                Ok(status) => {
809                    tracing::debug!("Order status for oid {}: {:?}", oid, status);
810                }
811                Err(e) => {
812                    tracing::warn!("Failed to query order status for oid {}: {}", oid, e);
813                }
814            }
815        });
816
817        Ok(())
818    }
819}
820
821////////////////////////////////////////////////////////////////////////////////
822// LiveExecutionClient Implementation
823////////////////////////////////////////////////////////////////////////////////
824
825use async_trait::async_trait;
826use nautilus_common::messages::execution::{
827    GenerateFillReports, GenerateOrderStatusReport, GeneratePositionReports,
828};
829use nautilus_execution::client::LiveExecutionClient;
830use nautilus_model::reports::{
831    ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport,
832};
833
834#[async_trait(?Send)]
835impl LiveExecutionClient for HyperliquidExecutionClient {
836    async fn connect(&mut self) -> anyhow::Result<()> {
837        if self.connected {
838            return Ok(());
839        }
840
841        tracing::info!("Connecting Hyperliquid execution client");
842
843        // Ensure instruments are initialized
844        self.ensure_instruments_initialized_async().await?;
845
846        // Connect WebSocket client
847        let url = crate::common::consts::ws_url(self.config.is_testnet);
848        self.ws_client = HyperliquidWebSocketClient::connect(url).await?;
849
850        // Subscribe to user-specific order updates and fills
851        let user_address = self.get_user_address()?;
852        self.ws_client
853            .subscribe_all_user_channels(&user_address)
854            .await?;
855
856        // Initialize account state
857        self.refresh_account_state().await?;
858
859        // Note: Order reconciliation is handled by the execution engine
860        // which will call generate_order_status_reports() after connection
861
862        self.connected = true;
863        self.core.set_connected(true);
864
865        tracing::info!(
866            "Hyperliquid execution client {} connected",
867            self.core.client_id
868        );
869        Ok(())
870    }
871
872    async fn disconnect(&mut self) -> anyhow::Result<()> {
873        if !self.connected {
874            return Ok(());
875        }
876
877        tracing::info!("Disconnecting Hyperliquid execution client");
878
879        // Disconnect WebSocket
880        self.ws_client.disconnect().await?;
881
882        // Abort any pending tasks
883        self.abort_pending_tasks();
884
885        self.connected = false;
886        self.core.set_connected(false);
887
888        tracing::info!(
889            "Hyperliquid execution client {} disconnected",
890            self.core.client_id
891        );
892        Ok(())
893    }
894
895    async fn generate_order_status_report(
896        &self,
897        _cmd: &GenerateOrderStatusReport,
898    ) -> anyhow::Result<Option<OrderStatusReport>> {
899        // NOTE: Single order status report generation requires instrument cache integration.
900        // The HTTP client methods and parsing functions are implemented and ready to use.
901        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
902        tracing::warn!("generate_order_status_report not yet fully implemented");
903        Ok(None)
904    }
905
906    async fn generate_order_status_reports(
907        &self,
908        cmd: &GenerateOrderStatusReport,
909    ) -> anyhow::Result<Vec<OrderStatusReport>> {
910        // NOTE: Order status reports generation infrastructure is complete:
911        // HTTP methods: info_open_orders(), info_frontend_open_orders()
912        // Parsing: parse_order_status_report_from_basic() and parse_order_status_report_from_ws()
913        // Status mapping: All order statuses and types supported
914        //  Pending: Instrument cache integration to look up instruments by ID
915        // Implementation: Fetch via info_frontend_open_orders(), parse each order, filter by cmd params
916
917        tracing::warn!("generate_order_status_reports requires instrument cache integration");
918
919        // Log what would be queried
920        if let Some(instrument_id) = cmd.instrument_id {
921            tracing::debug!("Would query orders for instrument: {}", instrument_id);
922        }
923        if let Some(client_order_id) = cmd.client_order_id {
924            tracing::debug!("Would filter by client_order_id: {}", client_order_id);
925        }
926        if let Some(venue_order_id) = cmd.venue_order_id {
927            tracing::debug!("Would filter by venue_order_id: {}", venue_order_id);
928        }
929
930        Ok(Vec::new())
931    }
932
933    async fn generate_fill_reports(
934        &self,
935        cmd: GenerateFillReports,
936    ) -> anyhow::Result<Vec<FillReport>> {
937        // NOTE: Fill reports generation infrastructure is complete:
938        // HTTP methods: info_user_fills() returns HyperliquidFills
939        // Parsing: parse_fill_report() with fee handling, liquidity side detection
940        // Money/Currency: Proper USDC fee integration
941        //  Pending: Instrument cache integration to look up instruments by symbol
942        // Implementation: Fetch via info_user_fills(), filter by time range, parse each fill
943
944        tracing::warn!("generate_fill_reports requires instrument cache integration");
945
946        // Log what would be queried
947        if let Some(start) = cmd.start {
948            tracing::debug!("Would filter fills from: {}", start);
949        }
950        if let Some(end) = cmd.end {
951            tracing::debug!("Would filter fills until: {}", end);
952        }
953        if let Some(instrument_id) = cmd.instrument_id {
954            tracing::debug!("Would filter fills for instrument: {}", instrument_id);
955        }
956
957        Ok(Vec::new())
958    }
959
960    async fn generate_position_status_reports(
961        &self,
962        cmd: &GeneratePositionReports,
963    ) -> anyhow::Result<Vec<PositionStatusReport>> {
964        // Get user address for API queries
965        let user_address = self.get_user_address()?;
966
967        // Query clearinghouse state from the API
968        let _response = self
969            .http_client
970            .info_clearinghouse_state(&user_address)
971            .await
972            .context("Failed to fetch clearinghouse state")?;
973
974        // NOTE: Position status reports infrastructure is complete:
975        // HTTP methods: info_clearinghouse_state() queries API successfully
976        // Models: ClearinghouseState, AssetPosition, PositionData all defined
977        // Parsing: parse_position_status_report() fully implemented
978        //  Pending: Instrument cache integration to look up instruments by coin symbol
979        // Implementation: Deserialize response to ClearinghouseState, iterate asset_positions,
980        //                parse each with parse_position_status_report(), filter by cmd params
981        tracing::warn!("Position status report parsing requires instrument cache integration");
982
983        // When cache available:
984        // 1. Deserialize clearinghouse state: serde_json::from_value::<ClearinghouseState>(response)
985        // 2. For each asset_position: look up instrument by position.coin
986        // 3. Parse: parse_position_status_report(&asset_position_json, instrument, account_id, ts_init)
987        // 4. Filter by cmd.instrument_id if specified
988
989        if cmd.instrument_id.is_some() {
990            tracing::debug!(
991                "Would filter positions by instrument_id: {:?}",
992                cmd.instrument_id
993            );
994        }
995
996        Ok(Vec::new())
997    }
998
999    async fn generate_mass_status(
1000        &self,
1001        lookback_mins: Option<u64>,
1002    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1003        tracing::warn!(
1004            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
1005        );
1006        // Full implementation would require:
1007        // 1. Query all orders within lookback window
1008        // 2. Query all fills within lookback window
1009        // 3. Query all positions
1010        // 4. Combine into ExecutionMassStatus
1011        Ok(None)
1012    }
1013}
1014
1015impl LiveExecutionClientExt for HyperliquidExecutionClient {
1016    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
1017        get_exec_event_sender()
1018    }
1019
1020    fn get_clock(&self) -> Ref<'_, dyn Clock> {
1021        self.core.clock().borrow()
1022    }
1023}
1024
1025impl HyperliquidExecutionClient {
1026    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1027        let mut handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1028        if handle_guard.is_some() {
1029            return Ok(());
1030        }
1031
1032        // Get user address for subscriptions
1033        let user_address = self.get_user_address()?;
1034        let account_id = self.core.account_id;
1035        let ws_client = self.ws_client.clone();
1036
1037        // Add instruments to WebSocket client cache
1038        // This ensures instruments are available for parsing
1039        let runtime = get_runtime();
1040        let instruments = runtime.block_on(async {
1041            self.http_client
1042                .request_instruments()
1043                .await
1044                .unwrap_or_default()
1045        });
1046
1047        for instrument in instruments {
1048            ws_client.add_instrument(instrument);
1049        }
1050
1051        // Spawn background task to process WebSocket messages
1052        let handle = runtime.spawn(async move {
1053            // Ensure connection and subscribe
1054            if let Err(e) = ws_client.ensure_connected().await {
1055                tracing::warn!("Failed to connect WebSocket: {}", e);
1056                return;
1057            }
1058
1059            if let Err(e) = ws_client.subscribe_order_updates(&user_address).await {
1060                tracing::warn!("Failed to subscribe to order updates: {}", e);
1061                return;
1062            }
1063
1064            if let Err(e) = ws_client.subscribe_user_events(&user_address).await {
1065                tracing::warn!("Failed to subscribe to user events: {}", e);
1066                return;
1067            }
1068
1069            tracing::info!("Subscribed to Hyperliquid execution updates");
1070
1071            let clock = get_atomic_clock_realtime();
1072
1073            // Process messages
1074            loop {
1075                let event = ws_client.next_event().await;
1076
1077                match event {
1078                    Some(msg) => {
1079                        match &msg {
1080                            HyperliquidWsMsg::OrderUpdates { data } => {
1081                                let mut exec_reports = Vec::new();
1082
1083                                // Process each order update in the array
1084                                for order_update in data {
1085                                    if let Some(instrument) =
1086                                        ws_client.get_instrument_by_symbol(&order_update.order.coin)
1087                                    {
1088                                        let ts_init = clock.get_time_ns();
1089
1090                                        match parse_ws_order_status_report(
1091                                            order_update,
1092                                            &instrument,
1093                                            account_id,
1094                                            ts_init,
1095                                        ) {
1096                                            Ok(report) => {
1097                                                exec_reports.push(ExecutionReport::Order(report));
1098                                            }
1099                                            Err(e) => {
1100                                                tracing::warn!("Error parsing order update: {}", e);
1101                                            }
1102                                        }
1103                                    } else {
1104                                        tracing::warn!(
1105                                            "No instrument found for symbol: {}",
1106                                            order_update.order.coin
1107                                        );
1108                                    }
1109                                }
1110
1111                                // Dispatch reports if any
1112                                if !exec_reports.is_empty() {
1113                                    for report in exec_reports {
1114                                        dispatch_execution_report(report);
1115                                    }
1116                                }
1117                            }
1118                            HyperliquidWsMsg::UserEvents { data } => {
1119                                use crate::websocket::messages::WsUserEventData;
1120
1121                                let ts_init = clock.get_time_ns();
1122
1123                                match data {
1124                                    WsUserEventData::Fills { fills } => {
1125                                        let mut exec_reports = Vec::new();
1126
1127                                        // Process each fill
1128                                        for fill in fills {
1129                                            if let Some(instrument) =
1130                                                ws_client.get_instrument_by_symbol(&fill.coin)
1131                                            {
1132                                                match parse_ws_fill_report(
1133                                                    fill,
1134                                                    &instrument,
1135                                                    account_id,
1136                                                    ts_init,
1137                                                ) {
1138                                                    Ok(report) => {
1139                                                        exec_reports
1140                                                            .push(ExecutionReport::Fill(report));
1141                                                    }
1142                                                    Err(e) => {
1143                                                        tracing::warn!("Error parsing fill: {}", e);
1144                                                    }
1145                                                }
1146                                            } else {
1147                                                tracing::warn!(
1148                                                    "No instrument found for symbol: {}",
1149                                                    fill.coin
1150                                                );
1151                                            }
1152                                        }
1153
1154                                        // Dispatch reports if any
1155                                        if !exec_reports.is_empty() {
1156                                            for report in exec_reports {
1157                                                dispatch_execution_report(report);
1158                                            }
1159                                        }
1160                                    }
1161                                    _ => {
1162                                        // Other user events (funding, liquidation, etc.) not handled yet
1163                                    }
1164                                }
1165                            }
1166                            _ => {
1167                                // Ignore other message types in execution stream
1168                            }
1169                        }
1170                    }
1171                    None => {
1172                        // Connection closed
1173                        tracing::warn!("Hyperliquid WebSocket connection closed");
1174                        break;
1175                    }
1176                }
1177            }
1178        });
1179
1180        *handle_guard = Some(handle);
1181        tracing::info!("Hyperliquid WebSocket execution stream started");
1182        Ok(())
1183    }
1184}
1185
1186fn dispatch_execution_report(report: ExecutionReport) {
1187    let sender = get_exec_event_sender();
1188    match report {
1189        ExecutionReport::Order(order_report) => {
1190            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1191            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1192                tracing::warn!("Failed to send order status report: {e}");
1193            }
1194        }
1195        ExecutionReport::Fill(fill_report) => {
1196            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1197            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1198                tracing::warn!("Failed to send fill report: {e}");
1199            }
1200        }
1201    }
1202}
1203
1204// Re-export execution models from the http module
1205pub use crate::http::models::{
1206    AssetId, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
1207    HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
1208    HyperliquidExecCancelResponseData, HyperliquidExecCancelStatus, HyperliquidExecFilledInfo,
1209    HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecModifyOrderRequest,
1210    HyperliquidExecModifyResponseData, HyperliquidExecModifyStatus, HyperliquidExecOrderKind,
1211    HyperliquidExecOrderResponseData, HyperliquidExecOrderStatus, HyperliquidExecPlaceOrderRequest,
1212    HyperliquidExecRequest, HyperliquidExecResponse, HyperliquidExecResponseData,
1213    HyperliquidExecRestingInfo, HyperliquidExecTif, HyperliquidExecTpSl,
1214    HyperliquidExecTriggerParams, HyperliquidExecTwapRequest, OrderId,
1215};