1use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25 error::DydxError,
26 proto::{
27 AccountAuthenticator, AccountPlusClient, GetAuthenticatorsRequest,
28 cosmos_sdk_proto::cosmos::{
29 auth::v1beta1::{
30 BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
31 },
32 bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
33 base::{
34 tendermint::v1beta1::{
35 Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
36 service_client::ServiceClient as BaseClient,
37 },
38 v1beta1::Coin,
39 },
40 tx::v1beta1::{
41 BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
42 service_client::ServiceClient as TxClient,
43 },
44 },
45 dydxprotocol::{
46 clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
47 perpetuals::{
48 Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
49 },
50 subaccounts::{
51 QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
52 query_client::QueryClient as SubaccountsClient,
53 },
54 },
55 },
56};
57
58pub type TxHash = String;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
63pub struct Height(pub u32);
64
65#[derive(Debug, Clone)]
73pub struct DydxGrpcClient {
74 channel: Channel,
75 auth: AuthClient<Channel>,
76 bank: BankClient<Channel>,
77 base: BaseClient<Channel>,
78 tx: TxClient<Channel>,
79 clob: ClobClient<Channel>,
80 perpetuals: PerpetualsClient<Channel>,
81 subaccounts: SubaccountsClient<Channel>,
82 accountplus: AccountPlusClient<Channel>,
83 current_url: String,
84}
85
86impl DydxGrpcClient {
87 pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
93 let mut endpoint = Channel::from_shared(grpc_url.clone())
94 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
95
96 if grpc_url.starts_with("https://") {
98 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
99 endpoint = endpoint
100 .tls_config(tls)
101 .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
102 }
103
104 let channel = endpoint.connect().await.map_err(|e| {
105 DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
106 "Connection failed: {e}"
107 ))))
108 })?;
109
110 Ok(Self {
111 auth: AuthClient::new(channel.clone()),
112 bank: BankClient::new(channel.clone()),
113 base: BaseClient::new(channel.clone()),
114 tx: TxClient::new(channel.clone()),
115 clob: ClobClient::new(channel.clone()),
116 perpetuals: PerpetualsClient::new(channel.clone()),
117 subaccounts: SubaccountsClient::new(channel.clone()),
118 accountplus: AccountPlusClient::new(channel.clone()),
119 channel,
120 current_url: grpc_url,
121 })
122 }
123
124 pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
134 if grpc_urls.is_empty() {
135 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
136 }
137
138 let mut last_error = None;
139
140 for (idx, url) in grpc_urls.iter().enumerate() {
141 let url_str = url.as_ref();
142 log::debug!(
143 "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
144 idx + 1,
145 grpc_urls.len()
146 );
147
148 match Self::new(url_str.to_string()).await {
149 Ok(client) => {
150 log::info!("Successfully connected to gRPC node: {url_str}");
151 return Ok(client);
152 }
153 Err(e) => {
154 log::warn!("Failed to connect to gRPC node {url_str}: {e}");
155 last_error = Some(e);
156 }
157 }
158 }
159
160 Err(last_error.unwrap_or_else(|| {
161 DydxError::Grpc(Box::new(tonic::Status::unavailable(
162 "All gRPC connection attempts failed".to_string(),
163 )))
164 }))
165 }
166
167 pub async fn reconnect_with_fallback(
177 &mut self,
178 grpc_urls: &[impl AsRef<str>],
179 ) -> Result<(), DydxError> {
180 if grpc_urls.is_empty() {
181 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
182 }
183
184 let mut last_error = None;
185
186 for (idx, url) in grpc_urls.iter().enumerate() {
187 let url_str = url.as_ref();
188
189 if url_str == self.current_url {
191 log::debug!("Skipping current URL: {url_str}");
192 continue;
193 }
194
195 log::debug!(
196 "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
197 idx + 1,
198 grpc_urls.len()
199 );
200
201 let mut endpoint = match Channel::from_shared(url_str.to_string())
202 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
203 {
204 Ok(ep) => ep,
205 Err(e) => {
206 last_error = Some(e);
207 continue;
208 }
209 };
210
211 if url_str.starts_with("https://") {
213 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
214 endpoint = match endpoint.tls_config(tls) {
215 Ok(ep) => ep,
216 Err(e) => {
217 last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
218 continue;
219 }
220 };
221 }
222
223 match endpoint.connect().await {
224 Ok(connected_channel) => {
225 log::info!("Successfully reconnected to gRPC node: {url_str}");
226
227 self.channel = connected_channel.clone();
229 self.auth = AuthClient::new(connected_channel.clone());
230 self.bank = BankClient::new(connected_channel.clone());
231 self.base = BaseClient::new(connected_channel.clone());
232 self.tx = TxClient::new(connected_channel.clone());
233 self.clob = ClobClient::new(connected_channel.clone());
234 self.perpetuals = PerpetualsClient::new(connected_channel.clone());
235 self.subaccounts = SubaccountsClient::new(connected_channel);
236 self.current_url = url_str.to_string();
237
238 return Ok(());
239 }
240 Err(e) => {
241 log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
242 last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
243 format!("Connection failed: {e}"),
244 ))));
245 }
246 }
247 }
248
249 Err(last_error.unwrap_or_else(|| {
250 DydxError::Grpc(Box::new(tonic::Status::unavailable(
251 "All gRPC reconnection attempts failed".to_string(),
252 )))
253 }))
254 }
255
256 #[must_use]
258 pub fn current_url(&self) -> &str {
259 &self.current_url
260 }
261
262 #[must_use]
266 pub fn channel(&self) -> &Channel {
267 &self.channel
268 }
269
270 pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
278 let req = QueryAccountRequest {
279 address: address.to_string(),
280 };
281 let resp = self
282 .auth
283 .account(req)
284 .await?
285 .into_inner()
286 .account
287 .ok_or_else(|| {
288 anyhow::anyhow!("Query account request failure, account should exist")
289 })?;
290
291 let account = BaseAccount::decode(&*resp.value)?;
292 Ok((account.account_number, account.sequence))
293 }
294
295 pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
302 let req = QueryAccountRequest {
303 address: address.to_string(),
304 };
305 let resp = self
306 .auth
307 .account(req)
308 .await?
309 .into_inner()
310 .account
311 .ok_or_else(|| {
312 anyhow::anyhow!("Query account request failure, account should exist")
313 })?;
314
315 Ok(BaseAccount::decode(&*resp.value)?)
316 }
317
318 pub async fn get_account_balances(
325 &mut self,
326 address: &str,
327 ) -> Result<Vec<Coin>, anyhow::Error> {
328 let req = QueryAllBalancesRequest {
329 address: address.to_string(),
330 resolve_denom: false,
331 pagination: None,
332 };
333 let balances = self.bank.all_balances(req).await?.into_inner().balances;
334 Ok(balances)
335 }
336
337 pub async fn get_authenticators(
346 &mut self,
347 address: &str,
348 ) -> Result<Vec<AccountAuthenticator>, anyhow::Error> {
349 let req = GetAuthenticatorsRequest {
350 account: address.to_string(),
351 };
352 let resp = self.accountplus.get_authenticators(req).await?.into_inner();
353 Ok(resp.account_authenticators)
354 }
355
356 pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
362 let req = GetNodeInfoRequest {};
363 let info = self.base.get_node_info(req).await?.into_inner();
364 Ok(info)
365 }
366
367 pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
373 let req = GetLatestBlockRequest::default();
374 let latest_block = self
375 .base
376 .get_latest_block(req)
377 .await?
378 .into_inner()
379 .sdk_block
380 .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
381 Ok(latest_block)
382 }
383
384 pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
390 let latest_block = self.latest_block().await?;
391 let header = latest_block
392 .header
393 .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
394 let height = Height(header.height.try_into()?);
395 Ok(height)
396 }
397
398 pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
404 let req = QueryAllPerpetualsRequest { pagination: None };
405 let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
406 Ok(response.perpetual)
407 }
408
409 pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
415 let req = QueryAllClobPairRequest { pagination: None };
416 let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
417 Ok(pairs)
418 }
419
420 pub async fn get_subaccount(
426 &mut self,
427 address: &str,
428 number: u32,
429 ) -> Result<SubaccountInfo, anyhow::Error> {
430 let req = QueryGetSubaccountRequest {
431 owner: address.to_string(),
432 number,
433 };
434 let subaccount = self
435 .subaccounts
436 .subaccount(req)
437 .await?
438 .into_inner()
439 .subaccount
440 .ok_or_else(|| {
441 anyhow::anyhow!("Subaccount query response does not contain subaccount")
442 })?;
443 Ok(subaccount)
444 }
445
446 #[allow(deprecated)]
452 pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
453 let req = SimulateRequest { tx_bytes, tx: None };
454 let gas_used = self
455 .tx
456 .simulate(req)
457 .await?
458 .into_inner()
459 .gas_info
460 .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
461 .gas_used;
462 Ok(gas_used)
463 }
464
465 pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
471 let req = BroadcastTxRequest {
472 tx_bytes,
473 mode: BroadcastMode::Sync as i32,
474 };
475 let response = self.tx.broadcast_tx(req).await?.into_inner();
476
477 if let Some(tx_response) = response.tx_response {
478 if tx_response.code != 0 {
479 anyhow::bail!(
480 "Transaction broadcast failed: code={}, log={}",
481 tx_response.code,
482 tx_response.raw_log
483 );
484 }
485 Ok(tx_response.txhash)
486 } else {
487 Err(anyhow::anyhow!(
488 "Broadcast response does not contain tx_response"
489 ))
490 }
491 }
492
493 pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
499 let req = GetTxRequest {
500 hash: hash.to_string(),
501 };
502 let response = self.tx.get_tx(req).await?.into_inner();
503
504 if let Some(tx) = response.tx {
505 let tx_bytes = tx.encode_to_vec();
507 cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
508 } else {
509 anyhow::bail!("Transaction not found")
510 }
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use rstest::rstest;
517
518 use super::*;
519
520 #[rstest]
521 fn test_height_ordering() {
522 let h1 = Height(100);
523 let h2 = Height(200);
524 assert!(h1 < h2);
525 assert_eq!(h1, Height(100));
526 }
527
528 #[tokio::test]
529 async fn test_new_with_fallback_empty_urls() {
530 let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
531 assert!(result.is_err());
532 if let Err(DydxError::Config(msg)) = result {
533 assert_eq!(msg, "No gRPC URLs provided");
534 } else {
535 panic!("Expected Config error");
536 }
537 }
538
539 #[tokio::test]
540 async fn test_new_with_fallback_invalid_urls() {
541 let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
543 let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
544
545 assert!(result.is_err());
547 }
548}