Skip to main content

nautilus_dydx/grpc/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! gRPC client implementation for dYdX v4 protocol.
17//!
18//! This module provides the main gRPC client for interacting with dYdX v4 validator nodes.
19//! It handles transaction signing, broadcasting, and querying account state.
20
21use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25    error::DydxError,
26    proto::{
27        AccountAuthenticator, AccountPlusClient, GetAuthenticatorsRequest,
28        cosmos_sdk_proto::cosmos::{
29            auth::v1beta1::{
30                BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
31            },
32            bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
33            base::{
34                tendermint::v1beta1::{
35                    Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
36                    service_client::ServiceClient as BaseClient,
37                },
38                v1beta1::Coin,
39            },
40            tx::v1beta1::{
41                BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
42                service_client::ServiceClient as TxClient,
43            },
44        },
45        dydxprotocol::{
46            clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
47            perpetuals::{
48                Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
49            },
50            subaccounts::{
51                QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
52                query_client::QueryClient as SubaccountsClient,
53            },
54        },
55    },
56};
57
58/// Transaction hash type (internally uses tendermint::Hash).
59pub type TxHash = String;
60
61/// Block height.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
63pub struct Height(pub u32);
64
65/// gRPC client for dYdX v4 protocol operations.
66///
67/// This client handles:
68/// - Transaction signing and broadcasting.
69/// - Account query operations.
70/// - Order placement and management via Cosmos SDK messages.
71/// - Connection management and automatic failover to fallback nodes.
72#[derive(Debug, Clone)]
73pub struct DydxGrpcClient {
74    channel: Channel,
75    auth: AuthClient<Channel>,
76    bank: BankClient<Channel>,
77    base: BaseClient<Channel>,
78    tx: TxClient<Channel>,
79    clob: ClobClient<Channel>,
80    perpetuals: PerpetualsClient<Channel>,
81    subaccounts: SubaccountsClient<Channel>,
82    accountplus: AccountPlusClient<Channel>,
83    current_url: String,
84}
85
86impl DydxGrpcClient {
87    /// Create a new gRPC client with a single URL.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the gRPC connection cannot be established.
92    pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
93        let mut endpoint = Channel::from_shared(grpc_url.clone())
94            .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
95
96        // Enable TLS for HTTPS URLs (required for public gRPC nodes)
97        if grpc_url.starts_with("https://") {
98            let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
99            endpoint = endpoint
100                .tls_config(tls)
101                .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
102        }
103
104        let channel = endpoint.connect().await.map_err(|e| {
105            DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
106                "Connection failed: {e}"
107            ))))
108        })?;
109
110        Ok(Self {
111            auth: AuthClient::new(channel.clone()),
112            bank: BankClient::new(channel.clone()),
113            base: BaseClient::new(channel.clone()),
114            tx: TxClient::new(channel.clone()),
115            clob: ClobClient::new(channel.clone()),
116            perpetuals: PerpetualsClient::new(channel.clone()),
117            subaccounts: SubaccountsClient::new(channel.clone()),
118            accountplus: AccountPlusClient::new(channel.clone()),
119            channel,
120            current_url: grpc_url,
121        })
122    }
123
124    /// Create a new gRPC client with fallback support.
125    ///
126    /// Attempts to connect to each URL in the provided list until a successful
127    /// connection is established. This is useful for DEX environments where nodes
128    /// can fail and fallback options are needed.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if none of the provided URLs can establish a connection.
133    pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
134        if grpc_urls.is_empty() {
135            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
136        }
137
138        let mut last_error = None;
139
140        for (idx, url) in grpc_urls.iter().enumerate() {
141            let url_str = url.as_ref();
142            log::debug!(
143                "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
144                idx + 1,
145                grpc_urls.len()
146            );
147
148            match Self::new(url_str.to_string()).await {
149                Ok(client) => {
150                    log::info!("Successfully connected to gRPC node: {url_str}");
151                    return Ok(client);
152                }
153                Err(e) => {
154                    log::warn!("Failed to connect to gRPC node {url_str}: {e}");
155                    last_error = Some(e);
156                }
157            }
158        }
159
160        Err(last_error.unwrap_or_else(|| {
161            DydxError::Grpc(Box::new(tonic::Status::unavailable(
162                "All gRPC connection attempts failed".to_string(),
163            )))
164        }))
165    }
166
167    /// Reconnect to a different gRPC node from the fallback list.
168    ///
169    /// Attempts to establish a new connection to each URL in the provided list
170    /// until successful. This is useful when the current node fails and you need
171    /// to failover to a different validator node.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if none of the provided URLs can establish a connection.
176    pub async fn reconnect_with_fallback(
177        &mut self,
178        grpc_urls: &[impl AsRef<str>],
179    ) -> Result<(), DydxError> {
180        if grpc_urls.is_empty() {
181            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
182        }
183
184        let mut last_error = None;
185
186        for (idx, url) in grpc_urls.iter().enumerate() {
187            let url_str = url.as_ref();
188
189            // Skip if it's the same URL we're currently connected to
190            if url_str == self.current_url {
191                log::debug!("Skipping current URL: {url_str}");
192                continue;
193            }
194
195            log::debug!(
196                "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
197                idx + 1,
198                grpc_urls.len()
199            );
200
201            let mut endpoint = match Channel::from_shared(url_str.to_string())
202                .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
203            {
204                Ok(ep) => ep,
205                Err(e) => {
206                    last_error = Some(e);
207                    continue;
208                }
209            };
210
211            // Enable TLS for HTTPS URLs (required for public gRPC nodes)
212            if url_str.starts_with("https://") {
213                let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
214                endpoint = match endpoint.tls_config(tls) {
215                    Ok(ep) => ep,
216                    Err(e) => {
217                        last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
218                        continue;
219                    }
220                };
221            }
222
223            match endpoint.connect().await {
224                Ok(connected_channel) => {
225                    log::info!("Successfully reconnected to gRPC node: {url_str}");
226
227                    // Update all service clients with the new channel
228                    self.channel = connected_channel.clone();
229                    self.auth = AuthClient::new(connected_channel.clone());
230                    self.bank = BankClient::new(connected_channel.clone());
231                    self.base = BaseClient::new(connected_channel.clone());
232                    self.tx = TxClient::new(connected_channel.clone());
233                    self.clob = ClobClient::new(connected_channel.clone());
234                    self.perpetuals = PerpetualsClient::new(connected_channel.clone());
235                    self.subaccounts = SubaccountsClient::new(connected_channel);
236                    self.current_url = url_str.to_string();
237
238                    return Ok(());
239                }
240                Err(e) => {
241                    log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
242                    last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
243                        format!("Connection failed: {e}"),
244                    ))));
245                }
246            }
247        }
248
249        Err(last_error.unwrap_or_else(|| {
250            DydxError::Grpc(Box::new(tonic::Status::unavailable(
251                "All gRPC reconnection attempts failed".to_string(),
252            )))
253        }))
254    }
255
256    /// Get the currently connected gRPC node URL.
257    #[must_use]
258    pub fn current_url(&self) -> &str {
259        &self.current_url
260    }
261
262    /// Get the underlying gRPC channel.
263    ///
264    /// This can be used to create custom gRPC service clients.
265    #[must_use]
266    pub fn channel(&self) -> &Channel {
267        &self.channel
268    }
269
270    /// Query account information for a given address.
271    ///
272    /// Returns the account number and sequence number needed for transaction signing.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the query fails or the account does not exist.
277    pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
278        let req = QueryAccountRequest {
279            address: address.to_string(),
280        };
281        let resp = self
282            .auth
283            .account(req)
284            .await?
285            .into_inner()
286            .account
287            .ok_or_else(|| {
288                anyhow::anyhow!("Query account request failure, account should exist")
289            })?;
290
291        let account = BaseAccount::decode(&*resp.value)?;
292        Ok((account.account_number, account.sequence))
293    }
294
295    /// Query for [an account](https://github.com/cosmos/cosmos-sdk/tree/main/x/auth#account-1)
296    /// by its address.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the query fails or the account does not exist.
301    pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
302        let req = QueryAccountRequest {
303            address: address.to_string(),
304        };
305        let resp = self
306            .auth
307            .account(req)
308            .await?
309            .into_inner()
310            .account
311            .ok_or_else(|| {
312                anyhow::anyhow!("Query account request failure, account should exist")
313            })?;
314
315        Ok(BaseAccount::decode(&*resp.value)?)
316    }
317
318    /// Query for [account balances](https://github.com/cosmos/cosmos-sdk/tree/main/x/bank#allbalances)
319    /// by address for all denominations.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the query fails.
324    pub async fn get_account_balances(
325        &mut self,
326        address: &str,
327    ) -> Result<Vec<Coin>, anyhow::Error> {
328        let req = QueryAllBalancesRequest {
329            address: address.to_string(),
330            resolve_denom: false,
331            pagination: None,
332        };
333        let balances = self.bank.all_balances(req).await?.into_inner().balances;
334        Ok(balances)
335    }
336
337    /// Query for authenticators registered for an account.
338    ///
339    /// Authenticators enable permissioned key trading, allowing API wallets
340    /// to sign transactions on behalf of a main account.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the query fails.
345    pub async fn get_authenticators(
346        &mut self,
347        address: &str,
348    ) -> Result<Vec<AccountAuthenticator>, anyhow::Error> {
349        let req = GetAuthenticatorsRequest {
350            account: address.to_string(),
351        };
352        let resp = self.accountplus.get_authenticators(req).await?.into_inner();
353        Ok(resp.account_authenticators)
354    }
355
356    /// Query for node info.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails.
361    pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
362        let req = GetNodeInfoRequest {};
363        let info = self.base.get_node_info(req).await?.into_inner();
364        Ok(info)
365    }
366
367    /// Query for the latest block.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the query fails.
372    pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
373        let req = GetLatestBlockRequest::default();
374        let latest_block = self
375            .base
376            .get_latest_block(req)
377            .await?
378            .into_inner()
379            .sdk_block
380            .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
381        Ok(latest_block)
382    }
383
384    /// Query for the latest block height.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if the query fails.
389    pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
390        let latest_block = self.latest_block().await?;
391        let header = latest_block
392            .header
393            .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
394        let height = Height(header.height.try_into()?);
395        Ok(height)
396    }
397
398    /// Query for all perpetual markets.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the query fails.
403    pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
404        let req = QueryAllPerpetualsRequest { pagination: None };
405        let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
406        Ok(response.perpetual)
407    }
408
409    /// Query for all CLOB pairs.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if the query fails.
414    pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
415        let req = QueryAllClobPairRequest { pagination: None };
416        let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
417        Ok(pairs)
418    }
419
420    /// Query for subaccount information.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if the query fails.
425    pub async fn get_subaccount(
426        &mut self,
427        address: &str,
428        number: u32,
429    ) -> Result<SubaccountInfo, anyhow::Error> {
430        let req = QueryGetSubaccountRequest {
431            owner: address.to_string(),
432            number,
433        };
434        let subaccount = self
435            .subaccounts
436            .subaccount(req)
437            .await?
438            .into_inner()
439            .subaccount
440            .ok_or_else(|| {
441                anyhow::anyhow!("Subaccount query response does not contain subaccount")
442            })?;
443        Ok(subaccount)
444    }
445
446    /// Simulate a transaction to estimate gas usage.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if simulation fails.
451    #[allow(deprecated)]
452    pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
453        let req = SimulateRequest { tx_bytes, tx: None };
454        let gas_used = self
455            .tx
456            .simulate(req)
457            .await?
458            .into_inner()
459            .gas_info
460            .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
461            .gas_used;
462        Ok(gas_used)
463    }
464
465    /// Broadcast a signed transaction.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if broadcasting fails.
470    pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
471        let req = BroadcastTxRequest {
472            tx_bytes,
473            mode: BroadcastMode::Sync as i32,
474        };
475        let response = self.tx.broadcast_tx(req).await?.into_inner();
476
477        if let Some(tx_response) = response.tx_response {
478            if tx_response.code != 0 {
479                anyhow::bail!(
480                    "Transaction broadcast failed: code={}, log={}",
481                    tx_response.code,
482                    tx_response.raw_log
483                );
484            }
485            Ok(tx_response.txhash)
486        } else {
487            Err(anyhow::anyhow!(
488                "Broadcast response does not contain tx_response"
489            ))
490        }
491    }
492
493    /// Query transaction by hash.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the query fails.
498    pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
499        let req = GetTxRequest {
500            hash: hash.to_string(),
501        };
502        let response = self.tx.get_tx(req).await?.into_inner();
503
504        if let Some(tx) = response.tx {
505            // Convert through bytes since the types are incompatible
506            let tx_bytes = tx.encode_to_vec();
507            cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
508        } else {
509            anyhow::bail!("Transaction not found")
510        }
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use rstest::rstest;
517
518    use super::*;
519
520    #[rstest]
521    fn test_height_ordering() {
522        let h1 = Height(100);
523        let h2 = Height(200);
524        assert!(h1 < h2);
525        assert_eq!(h1, Height(100));
526    }
527
528    #[tokio::test]
529    async fn test_new_with_fallback_empty_urls() {
530        let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
531        assert!(result.is_err());
532        if let Err(DydxError::Config(msg)) = result {
533            assert_eq!(msg, "No gRPC URLs provided");
534        } else {
535            panic!("Expected Config error");
536        }
537    }
538
539    #[tokio::test]
540    async fn test_new_with_fallback_invalid_urls() {
541        // Test with invalid URLs that will fail to connect
542        let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
543        let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
544
545        // Should fail with either Config or Grpc error
546        assert!(result.is_err());
547    }
548}