nautilus_dydx/grpc/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! gRPC client implementation for dYdX v4 protocol.
17//!
18//! This module provides the main gRPC client for interacting with dYdX v4 validator nodes.
19//! It handles transaction signing, broadcasting, and querying account state.
20
21use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25    error::DydxError,
26    proto::{
27        cosmos_sdk_proto::cosmos::{
28            auth::v1beta1::{
29                BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
30            },
31            bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
32            base::{
33                tendermint::v1beta1::{
34                    Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
35                    service_client::ServiceClient as BaseClient,
36                },
37                v1beta1::Coin,
38            },
39            tx::v1beta1::{
40                BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
41                service_client::ServiceClient as TxClient,
42            },
43        },
44        dydxprotocol::{
45            clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
46            perpetuals::{
47                Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
48            },
49            subaccounts::{
50                QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
51                query_client::QueryClient as SubaccountsClient,
52            },
53        },
54    },
55};
56
57/// Transaction hash type (internally uses tendermint::Hash).
58pub type TxHash = String;
59
60/// Block height.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
62pub struct Height(pub u32);
63
64/// gRPC client for dYdX v4 protocol operations.
65///
66/// This client handles:
67/// - Transaction signing and broadcasting.
68/// - Account query operations.
69/// - Order placement and management via Cosmos SDK messages.
70/// - Connection management and automatic failover to fallback nodes.
71#[derive(Debug, Clone)]
72pub struct DydxGrpcClient {
73    channel: Channel,
74    auth: AuthClient<Channel>,
75    bank: BankClient<Channel>,
76    base: BaseClient<Channel>,
77    tx: TxClient<Channel>,
78    clob: ClobClient<Channel>,
79    perpetuals: PerpetualsClient<Channel>,
80    subaccounts: SubaccountsClient<Channel>,
81    current_url: String,
82}
83
84impl DydxGrpcClient {
85    /// Create a new gRPC client with a single URL.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the gRPC connection cannot be established.
90    pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
91        let mut endpoint = Channel::from_shared(grpc_url.clone())
92            .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
93
94        // Enable TLS for HTTPS URLs (required for public gRPC nodes)
95        if grpc_url.starts_with("https://") {
96            let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
97            endpoint = endpoint
98                .tls_config(tls)
99                .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
100        }
101
102        let channel = endpoint.connect().await.map_err(|e| {
103            DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
104                "Connection failed: {e}"
105            ))))
106        })?;
107
108        Ok(Self {
109            auth: AuthClient::new(channel.clone()),
110            bank: BankClient::new(channel.clone()),
111            base: BaseClient::new(channel.clone()),
112            tx: TxClient::new(channel.clone()),
113            clob: ClobClient::new(channel.clone()),
114            perpetuals: PerpetualsClient::new(channel.clone()),
115            subaccounts: SubaccountsClient::new(channel.clone()),
116            channel,
117            current_url: grpc_url,
118        })
119    }
120
121    /// Create a new gRPC client with fallback support.
122    ///
123    /// Attempts to connect to each URL in the provided list until a successful
124    /// connection is established. This is useful for DEX environments where nodes
125    /// can fail and fallback options are needed.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if none of the provided URLs can establish a connection.
130    pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
131        if grpc_urls.is_empty() {
132            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
133        }
134
135        let mut last_error = None;
136
137        for (idx, url) in grpc_urls.iter().enumerate() {
138            let url_str = url.as_ref();
139            log::debug!(
140                "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
141                idx + 1,
142                grpc_urls.len()
143            );
144
145            match Self::new(url_str.to_string()).await {
146                Ok(client) => {
147                    log::info!("Successfully connected to gRPC node: {url_str}");
148                    return Ok(client);
149                }
150                Err(e) => {
151                    log::warn!("Failed to connect to gRPC node {url_str}: {e}");
152                    last_error = Some(e);
153                }
154            }
155        }
156
157        Err(last_error.unwrap_or_else(|| {
158            DydxError::Grpc(Box::new(tonic::Status::unavailable(
159                "All gRPC connection attempts failed".to_string(),
160            )))
161        }))
162    }
163
164    /// Reconnect to a different gRPC node from the fallback list.
165    ///
166    /// Attempts to establish a new connection to each URL in the provided list
167    /// until successful. This is useful when the current node fails and you need
168    /// to failover to a different validator node.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if none of the provided URLs can establish a connection.
173    pub async fn reconnect_with_fallback(
174        &mut self,
175        grpc_urls: &[impl AsRef<str>],
176    ) -> Result<(), DydxError> {
177        if grpc_urls.is_empty() {
178            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
179        }
180
181        let mut last_error = None;
182
183        for (idx, url) in grpc_urls.iter().enumerate() {
184            let url_str = url.as_ref();
185
186            // Skip if it's the same URL we're currently connected to
187            if url_str == self.current_url {
188                log::debug!("Skipping current URL: {url_str}");
189                continue;
190            }
191
192            log::debug!(
193                "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
194                idx + 1,
195                grpc_urls.len()
196            );
197
198            let mut endpoint = match Channel::from_shared(url_str.to_string())
199                .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
200            {
201                Ok(ep) => ep,
202                Err(e) => {
203                    last_error = Some(e);
204                    continue;
205                }
206            };
207
208            // Enable TLS for HTTPS URLs (required for public gRPC nodes)
209            if url_str.starts_with("https://") {
210                let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
211                endpoint = match endpoint.tls_config(tls) {
212                    Ok(ep) => ep,
213                    Err(e) => {
214                        last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
215                        continue;
216                    }
217                };
218            }
219
220            match endpoint.connect().await {
221                Ok(connected_channel) => {
222                    log::info!("Successfully reconnected to gRPC node: {url_str}");
223
224                    // Update all service clients with the new channel
225                    self.channel = connected_channel.clone();
226                    self.auth = AuthClient::new(connected_channel.clone());
227                    self.bank = BankClient::new(connected_channel.clone());
228                    self.base = BaseClient::new(connected_channel.clone());
229                    self.tx = TxClient::new(connected_channel.clone());
230                    self.clob = ClobClient::new(connected_channel.clone());
231                    self.perpetuals = PerpetualsClient::new(connected_channel.clone());
232                    self.subaccounts = SubaccountsClient::new(connected_channel);
233                    self.current_url = url_str.to_string();
234
235                    return Ok(());
236                }
237                Err(e) => {
238                    log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
239                    last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
240                        format!("Connection failed: {e}"),
241                    ))));
242                }
243            }
244        }
245
246        Err(last_error.unwrap_or_else(|| {
247            DydxError::Grpc(Box::new(tonic::Status::unavailable(
248                "All gRPC reconnection attempts failed".to_string(),
249            )))
250        }))
251    }
252
253    /// Get the currently connected gRPC node URL.
254    #[must_use]
255    pub fn current_url(&self) -> &str {
256        &self.current_url
257    }
258
259    /// Get the underlying gRPC channel.
260    ///
261    /// This can be used to create custom gRPC service clients.
262    #[must_use]
263    pub fn channel(&self) -> &Channel {
264        &self.channel
265    }
266
267    /// Query account information for a given address.
268    ///
269    /// Returns the account number and sequence number needed for transaction signing.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the query fails or the account does not exist.
274    pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
275        let req = QueryAccountRequest {
276            address: address.to_string(),
277        };
278        let resp = self
279            .auth
280            .account(req)
281            .await?
282            .into_inner()
283            .account
284            .ok_or_else(|| {
285                anyhow::anyhow!("Query account request failure, account should exist")
286            })?;
287
288        let account = BaseAccount::decode(&*resp.value)?;
289        Ok((account.account_number, account.sequence))
290    }
291
292    /// Query for [an account](https://github.com/cosmos/cosmos-sdk/tree/main/x/auth#account-1)
293    /// by its address.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the query fails or the account does not exist.
298    pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
299        let req = QueryAccountRequest {
300            address: address.to_string(),
301        };
302        let resp = self
303            .auth
304            .account(req)
305            .await?
306            .into_inner()
307            .account
308            .ok_or_else(|| {
309                anyhow::anyhow!("Query account request failure, account should exist")
310            })?;
311
312        Ok(BaseAccount::decode(&*resp.value)?)
313    }
314
315    /// Query for [account balances](https://github.com/cosmos/cosmos-sdk/tree/main/x/bank#allbalances)
316    /// by address for all denominations.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the query fails.
321    pub async fn get_account_balances(
322        &mut self,
323        address: &str,
324    ) -> Result<Vec<Coin>, anyhow::Error> {
325        let req = QueryAllBalancesRequest {
326            address: address.to_string(),
327            resolve_denom: false,
328            pagination: None,
329        };
330        let balances = self.bank.all_balances(req).await?.into_inner().balances;
331        Ok(balances)
332    }
333
334    /// Query for node info.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the query fails.
339    pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
340        let req = GetNodeInfoRequest {};
341        let info = self.base.get_node_info(req).await?.into_inner();
342        Ok(info)
343    }
344
345    /// Query for the latest block.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the query fails.
350    pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
351        let req = GetLatestBlockRequest::default();
352        let latest_block = self
353            .base
354            .get_latest_block(req)
355            .await?
356            .into_inner()
357            .sdk_block
358            .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
359        Ok(latest_block)
360    }
361
362    /// Query for the latest block height.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the query fails.
367    pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
368        let latest_block = self.latest_block().await?;
369        let header = latest_block
370            .header
371            .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
372        let height = Height(header.height.try_into()?);
373        Ok(height)
374    }
375
376    /// Query for all perpetual markets.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the query fails.
381    pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
382        let req = QueryAllPerpetualsRequest { pagination: None };
383        let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
384        Ok(response.perpetual)
385    }
386
387    /// Query for all CLOB pairs.
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if the query fails.
392    pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
393        let req = QueryAllClobPairRequest { pagination: None };
394        let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
395        Ok(pairs)
396    }
397
398    /// Query for subaccount information.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the query fails.
403    pub async fn get_subaccount(
404        &mut self,
405        address: &str,
406        number: u32,
407    ) -> Result<SubaccountInfo, anyhow::Error> {
408        let req = QueryGetSubaccountRequest {
409            owner: address.to_string(),
410            number,
411        };
412        let subaccount = self
413            .subaccounts
414            .subaccount(req)
415            .await?
416            .into_inner()
417            .subaccount
418            .ok_or_else(|| {
419                anyhow::anyhow!("Subaccount query response does not contain subaccount")
420            })?;
421        Ok(subaccount)
422    }
423
424    /// Simulate a transaction to estimate gas usage.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if simulation fails.
429    #[allow(deprecated)]
430    pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
431        let req = SimulateRequest { tx_bytes, tx: None };
432        let gas_used = self
433            .tx
434            .simulate(req)
435            .await?
436            .into_inner()
437            .gas_info
438            .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
439            .gas_used;
440        Ok(gas_used)
441    }
442
443    /// Broadcast a signed transaction.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if broadcasting fails.
448    pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
449        let req = BroadcastTxRequest {
450            tx_bytes,
451            mode: BroadcastMode::Sync as i32,
452        };
453        let response = self.tx.broadcast_tx(req).await?.into_inner();
454
455        if let Some(tx_response) = response.tx_response {
456            if tx_response.code != 0 {
457                anyhow::bail!(
458                    "Transaction broadcast failed: code={}, log={}",
459                    tx_response.code,
460                    tx_response.raw_log
461                );
462            }
463            Ok(tx_response.txhash)
464        } else {
465            Err(anyhow::anyhow!(
466                "Broadcast response does not contain tx_response"
467            ))
468        }
469    }
470
471    /// Query transaction by hash.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if the query fails.
476    pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
477        let req = GetTxRequest {
478            hash: hash.to_string(),
479        };
480        let response = self.tx.get_tx(req).await?.into_inner();
481
482        if let Some(tx) = response.tx {
483            // Convert through bytes since the types are incompatible
484            let tx_bytes = tx.encode_to_vec();
485            cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
486        } else {
487            anyhow::bail!("Transaction not found")
488        }
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use rstest::rstest;
495
496    use super::*;
497
498    #[rstest]
499    fn test_height_ordering() {
500        let h1 = Height(100);
501        let h2 = Height(200);
502        assert!(h1 < h2);
503        assert_eq!(h1, Height(100));
504    }
505
506    #[tokio::test]
507    async fn test_new_with_fallback_empty_urls() {
508        let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
509        assert!(result.is_err());
510        if let Err(DydxError::Config(msg)) = result {
511            assert_eq!(msg, "No gRPC URLs provided");
512        } else {
513            panic!("Expected Config error");
514        }
515    }
516
517    #[tokio::test]
518    async fn test_new_with_fallback_invalid_urls() {
519        // Test with invalid URLs that will fail to connect
520        let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
521        let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
522
523        // Should fail with either Config or Grpc error
524        assert!(result.is_err());
525    }
526}