nautilus_dydx/grpc/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! gRPC client implementation for dYdX v4 protocol.
17//!
18//! This module provides the main gRPC client for interacting with dYdX v4 validator nodes.
19//! It handles transaction signing, broadcasting, and querying account state.
20
21use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25    error::DydxError,
26    proto::{
27        cosmos_sdk_proto::cosmos::{
28            auth::v1beta1::{
29                BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
30            },
31            bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
32            base::{
33                tendermint::v1beta1::{
34                    Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
35                    service_client::ServiceClient as BaseClient,
36                },
37                v1beta1::Coin,
38            },
39            tx::v1beta1::{
40                BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
41                service_client::ServiceClient as TxClient,
42            },
43        },
44        dydxprotocol::{
45            clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
46            perpetuals::{
47                Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
48            },
49            subaccounts::{
50                QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
51                query_client::QueryClient as SubaccountsClient,
52            },
53        },
54    },
55};
56
57/// Transaction hash type (internally uses tendermint::Hash).
58pub type TxHash = String;
59
60/// Block height.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
62pub struct Height(pub u32);
63
64/// gRPC client for dYdX v4 protocol operations.
65///
66/// This client handles:
67/// - Transaction signing and broadcasting.
68/// - Account query operations.
69/// - Order placement and management via Cosmos SDK messages.
70/// - Connection management and automatic failover to fallback nodes.
71#[derive(Debug, Clone)]
72pub struct DydxGrpcClient {
73    channel: Channel,
74    auth: AuthClient<Channel>,
75    bank: BankClient<Channel>,
76    base: BaseClient<Channel>,
77    tx: TxClient<Channel>,
78    clob: ClobClient<Channel>,
79    perpetuals: PerpetualsClient<Channel>,
80    subaccounts: SubaccountsClient<Channel>,
81    current_url: String,
82}
83
84impl DydxGrpcClient {
85    /// Create a new gRPC client with a single URL.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the gRPC connection cannot be established.
90    pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
91        let mut endpoint = Channel::from_shared(grpc_url.clone())
92            .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
93
94        // Enable TLS for HTTPS URLs (required for public gRPC nodes)
95        if grpc_url.starts_with("https://") {
96            let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
97            endpoint = endpoint
98                .tls_config(tls)
99                .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
100        }
101
102        let channel = endpoint.connect().await.map_err(|e| {
103            DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
104                "Connection failed: {e}"
105            ))))
106        })?;
107
108        Ok(Self {
109            auth: AuthClient::new(channel.clone()),
110            bank: BankClient::new(channel.clone()),
111            base: BaseClient::new(channel.clone()),
112            tx: TxClient::new(channel.clone()),
113            clob: ClobClient::new(channel.clone()),
114            perpetuals: PerpetualsClient::new(channel.clone()),
115            subaccounts: SubaccountsClient::new(channel.clone()),
116            channel,
117            current_url: grpc_url,
118        })
119    }
120
121    /// Create a new gRPC client with fallback support.
122    ///
123    /// Attempts to connect to each URL in the provided list until a successful
124    /// connection is established. This is useful for DEX environments where nodes
125    /// can fail and fallback options are needed.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if none of the provided URLs can establish a connection.
130    pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
131        if grpc_urls.is_empty() {
132            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
133        }
134
135        let mut last_error = None;
136
137        for (idx, url) in grpc_urls.iter().enumerate() {
138            let url_str = url.as_ref();
139            tracing::debug!(
140                "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
141                idx + 1,
142                grpc_urls.len()
143            );
144
145            match Self::new(url_str.to_string()).await {
146                Ok(client) => {
147                    tracing::info!("Successfully connected to gRPC node: {url_str}");
148                    return Ok(client);
149                }
150                Err(e) => {
151                    tracing::warn!("Failed to connect to gRPC node {url_str}: {e}");
152                    last_error = Some(e);
153                }
154            }
155        }
156
157        Err(last_error.unwrap_or_else(|| {
158            DydxError::Grpc(Box::new(tonic::Status::unavailable(
159                "All gRPC connection attempts failed".to_string(),
160            )))
161        }))
162    }
163
164    /// Reconnect to a different gRPC node from the fallback list.
165    ///
166    /// Attempts to establish a new connection to each URL in the provided list
167    /// until successful. This is useful when the current node fails and you need
168    /// to failover to a different validator node.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if none of the provided URLs can establish a connection.
173    pub async fn reconnect_with_fallback(
174        &mut self,
175        grpc_urls: &[impl AsRef<str>],
176    ) -> Result<(), DydxError> {
177        if grpc_urls.is_empty() {
178            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
179        }
180
181        let mut last_error = None;
182
183        for (idx, url) in grpc_urls.iter().enumerate() {
184            let url_str = url.as_ref();
185
186            // Skip if it's the same URL we're currently connected to
187            if url_str == self.current_url {
188                tracing::debug!("Skipping current URL: {url_str}");
189                continue;
190            }
191
192            tracing::debug!(
193                "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
194                idx + 1,
195                grpc_urls.len()
196            );
197
198            let channel = match Channel::from_shared(url_str.to_string())
199                .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
200            {
201                Ok(ch) => ch,
202                Err(e) => {
203                    last_error = Some(e);
204                    continue;
205                }
206            };
207
208            match channel.connect().await {
209                Ok(connected_channel) => {
210                    tracing::info!("Successfully reconnected to gRPC node: {url_str}");
211
212                    // Update all service clients with the new channel
213                    self.channel = connected_channel.clone();
214                    self.auth = AuthClient::new(connected_channel.clone());
215                    self.bank = BankClient::new(connected_channel.clone());
216                    self.base = BaseClient::new(connected_channel.clone());
217                    self.tx = TxClient::new(connected_channel.clone());
218                    self.clob = ClobClient::new(connected_channel.clone());
219                    self.perpetuals = PerpetualsClient::new(connected_channel.clone());
220                    self.subaccounts = SubaccountsClient::new(connected_channel);
221                    self.current_url = url_str.to_string();
222
223                    return Ok(());
224                }
225                Err(e) => {
226                    tracing::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
227                    last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
228                        format!("Connection failed: {e}"),
229                    ))));
230                }
231            }
232        }
233
234        Err(last_error.unwrap_or_else(|| {
235            DydxError::Grpc(Box::new(tonic::Status::unavailable(
236                "All gRPC reconnection attempts failed".to_string(),
237            )))
238        }))
239    }
240
241    /// Get the currently connected gRPC node URL.
242    #[must_use]
243    pub fn current_url(&self) -> &str {
244        &self.current_url
245    }
246
247    /// Get the underlying gRPC channel.
248    ///
249    /// This can be used to create custom gRPC service clients.
250    #[must_use]
251    pub fn channel(&self) -> &Channel {
252        &self.channel
253    }
254
255    /// Query account information for a given address.
256    ///
257    /// Returns the account number and sequence number needed for transaction signing.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the query fails or the account does not exist.
262    pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
263        let req = QueryAccountRequest {
264            address: address.to_string(),
265        };
266        let resp = self
267            .auth
268            .account(req)
269            .await?
270            .into_inner()
271            .account
272            .ok_or_else(|| {
273                anyhow::anyhow!("Query account request failure, account should exist")
274            })?;
275
276        let account = BaseAccount::decode(&*resp.value)?;
277        Ok((account.account_number, account.sequence))
278    }
279
280    /// Query for [an account](https://github.com/cosmos/cosmos-sdk/tree/main/x/auth#account-1)
281    /// by its address.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the query fails or the account does not exist.
286    pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
287        let req = QueryAccountRequest {
288            address: address.to_string(),
289        };
290        let resp = self
291            .auth
292            .account(req)
293            .await?
294            .into_inner()
295            .account
296            .ok_or_else(|| {
297                anyhow::anyhow!("Query account request failure, account should exist")
298            })?;
299
300        Ok(BaseAccount::decode(&*resp.value)?)
301    }
302
303    /// Query for [account balances](https://github.com/cosmos/cosmos-sdk/tree/main/x/bank#allbalances)
304    /// by address for all denominations.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the query fails.
309    pub async fn get_account_balances(
310        &mut self,
311        address: &str,
312    ) -> Result<Vec<Coin>, anyhow::Error> {
313        let req = QueryAllBalancesRequest {
314            address: address.to_string(),
315            resolve_denom: false,
316            pagination: None,
317        };
318        let balances = self.bank.all_balances(req).await?.into_inner().balances;
319        Ok(balances)
320    }
321
322    /// Query for node info.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the query fails.
327    pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
328        let req = GetNodeInfoRequest {};
329        let info = self.base.get_node_info(req).await?.into_inner();
330        Ok(info)
331    }
332
333    /// Query for the latest block.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the query fails.
338    pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
339        let req = GetLatestBlockRequest::default();
340        let latest_block = self
341            .base
342            .get_latest_block(req)
343            .await?
344            .into_inner()
345            .sdk_block
346            .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
347        Ok(latest_block)
348    }
349
350    /// Query for the latest block height.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the query fails.
355    pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
356        let latest_block = self.latest_block().await?;
357        let header = latest_block
358            .header
359            .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
360        let height = Height(header.height.try_into()?);
361        Ok(height)
362    }
363
364    /// Query for all perpetual markets.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the query fails.
369    pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
370        let req = QueryAllPerpetualsRequest { pagination: None };
371        let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
372        Ok(response.perpetual)
373    }
374
375    /// Query for all CLOB pairs.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the query fails.
380    pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
381        let req = QueryAllClobPairRequest { pagination: None };
382        let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
383        Ok(pairs)
384    }
385
386    /// Query for subaccount information.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if the query fails.
391    pub async fn get_subaccount(
392        &mut self,
393        address: &str,
394        number: u32,
395    ) -> Result<SubaccountInfo, anyhow::Error> {
396        let req = QueryGetSubaccountRequest {
397            owner: address.to_string(),
398            number,
399        };
400        let subaccount = self
401            .subaccounts
402            .subaccount(req)
403            .await?
404            .into_inner()
405            .subaccount
406            .ok_or_else(|| {
407                anyhow::anyhow!("Subaccount query response does not contain subaccount")
408            })?;
409        Ok(subaccount)
410    }
411
412    /// Simulate a transaction to estimate gas usage.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if simulation fails.
417    #[allow(deprecated)]
418    pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
419        let req = SimulateRequest { tx_bytes, tx: None };
420        let gas_used = self
421            .tx
422            .simulate(req)
423            .await?
424            .into_inner()
425            .gas_info
426            .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
427            .gas_used;
428        Ok(gas_used)
429    }
430
431    /// Broadcast a signed transaction.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if broadcasting fails.
436    pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
437        let req = BroadcastTxRequest {
438            tx_bytes,
439            mode: BroadcastMode::Sync as i32,
440        };
441        let response = self.tx.broadcast_tx(req).await?.into_inner();
442
443        if let Some(tx_response) = response.tx_response {
444            if tx_response.code != 0 {
445                anyhow::bail!(
446                    "Transaction broadcast failed: code={}, log={}",
447                    tx_response.code,
448                    tx_response.raw_log
449                );
450            }
451            Ok(tx_response.txhash)
452        } else {
453            Err(anyhow::anyhow!(
454                "Broadcast response does not contain tx_response"
455            ))
456        }
457    }
458
459    /// Query transaction by hash.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if the query fails.
464    pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
465        let req = GetTxRequest {
466            hash: hash.to_string(),
467        };
468        let response = self.tx.get_tx(req).await?.into_inner();
469
470        if let Some(tx) = response.tx {
471            // Convert through bytes since the types are incompatible
472            let tx_bytes = tx.encode_to_vec();
473            cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
474        } else {
475            anyhow::bail!("Transaction not found")
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use rstest::rstest;
483
484    use super::*;
485
486    #[rstest]
487    fn test_height_ordering() {
488        let h1 = Height(100);
489        let h2 = Height(200);
490        assert!(h1 < h2);
491        assert_eq!(h1, Height(100));
492    }
493
494    #[tokio::test]
495    async fn test_new_with_fallback_empty_urls() {
496        let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
497        assert!(result.is_err());
498        if let Err(DydxError::Config(msg)) = result {
499            assert_eq!(msg, "No gRPC URLs provided");
500        } else {
501            panic!("Expected Config error");
502        }
503    }
504
505    #[tokio::test]
506    async fn test_new_with_fallback_invalid_urls() {
507        // Test with invalid URLs that will fail to connect
508        let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
509        let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
510
511        // Should fail with either Config or Grpc error
512        assert!(result.is_err());
513    }
514}