1use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25 error::DydxError,
26 proto::{
27 cosmos_sdk_proto::cosmos::{
28 auth::v1beta1::{
29 BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
30 },
31 bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
32 base::{
33 tendermint::v1beta1::{
34 Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
35 service_client::ServiceClient as BaseClient,
36 },
37 v1beta1::Coin,
38 },
39 tx::v1beta1::{
40 BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
41 service_client::ServiceClient as TxClient,
42 },
43 },
44 dydxprotocol::{
45 clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
46 perpetuals::{
47 Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
48 },
49 subaccounts::{
50 QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
51 query_client::QueryClient as SubaccountsClient,
52 },
53 },
54 },
55};
56
57pub type TxHash = String;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
62pub struct Height(pub u32);
63
64#[derive(Debug, Clone)]
72pub struct DydxGrpcClient {
73 channel: Channel,
74 auth: AuthClient<Channel>,
75 bank: BankClient<Channel>,
76 base: BaseClient<Channel>,
77 tx: TxClient<Channel>,
78 clob: ClobClient<Channel>,
79 perpetuals: PerpetualsClient<Channel>,
80 subaccounts: SubaccountsClient<Channel>,
81 current_url: String,
82}
83
84impl DydxGrpcClient {
85 pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
91 let mut endpoint = Channel::from_shared(grpc_url.clone())
92 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
93
94 if grpc_url.starts_with("https://") {
96 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
97 endpoint = endpoint
98 .tls_config(tls)
99 .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
100 }
101
102 let channel = endpoint.connect().await.map_err(|e| {
103 DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
104 "Connection failed: {e}"
105 ))))
106 })?;
107
108 Ok(Self {
109 auth: AuthClient::new(channel.clone()),
110 bank: BankClient::new(channel.clone()),
111 base: BaseClient::new(channel.clone()),
112 tx: TxClient::new(channel.clone()),
113 clob: ClobClient::new(channel.clone()),
114 perpetuals: PerpetualsClient::new(channel.clone()),
115 subaccounts: SubaccountsClient::new(channel.clone()),
116 channel,
117 current_url: grpc_url,
118 })
119 }
120
121 pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
131 if grpc_urls.is_empty() {
132 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
133 }
134
135 let mut last_error = None;
136
137 for (idx, url) in grpc_urls.iter().enumerate() {
138 let url_str = url.as_ref();
139 tracing::debug!(
140 "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
141 idx + 1,
142 grpc_urls.len()
143 );
144
145 match Self::new(url_str.to_string()).await {
146 Ok(client) => {
147 tracing::info!("Successfully connected to gRPC node: {url_str}");
148 return Ok(client);
149 }
150 Err(e) => {
151 tracing::warn!("Failed to connect to gRPC node {url_str}: {e}");
152 last_error = Some(e);
153 }
154 }
155 }
156
157 Err(last_error.unwrap_or_else(|| {
158 DydxError::Grpc(Box::new(tonic::Status::unavailable(
159 "All gRPC connection attempts failed".to_string(),
160 )))
161 }))
162 }
163
164 pub async fn reconnect_with_fallback(
174 &mut self,
175 grpc_urls: &[impl AsRef<str>],
176 ) -> Result<(), DydxError> {
177 if grpc_urls.is_empty() {
178 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
179 }
180
181 let mut last_error = None;
182
183 for (idx, url) in grpc_urls.iter().enumerate() {
184 let url_str = url.as_ref();
185
186 if url_str == self.current_url {
188 tracing::debug!("Skipping current URL: {url_str}");
189 continue;
190 }
191
192 tracing::debug!(
193 "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
194 idx + 1,
195 grpc_urls.len()
196 );
197
198 let channel = match Channel::from_shared(url_str.to_string())
199 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
200 {
201 Ok(ch) => ch,
202 Err(e) => {
203 last_error = Some(e);
204 continue;
205 }
206 };
207
208 match channel.connect().await {
209 Ok(connected_channel) => {
210 tracing::info!("Successfully reconnected to gRPC node: {url_str}");
211
212 self.channel = connected_channel.clone();
214 self.auth = AuthClient::new(connected_channel.clone());
215 self.bank = BankClient::new(connected_channel.clone());
216 self.base = BaseClient::new(connected_channel.clone());
217 self.tx = TxClient::new(connected_channel.clone());
218 self.clob = ClobClient::new(connected_channel.clone());
219 self.perpetuals = PerpetualsClient::new(connected_channel.clone());
220 self.subaccounts = SubaccountsClient::new(connected_channel);
221 self.current_url = url_str.to_string();
222
223 return Ok(());
224 }
225 Err(e) => {
226 tracing::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
227 last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
228 format!("Connection failed: {e}"),
229 ))));
230 }
231 }
232 }
233
234 Err(last_error.unwrap_or_else(|| {
235 DydxError::Grpc(Box::new(tonic::Status::unavailable(
236 "All gRPC reconnection attempts failed".to_string(),
237 )))
238 }))
239 }
240
241 #[must_use]
243 pub fn current_url(&self) -> &str {
244 &self.current_url
245 }
246
247 #[must_use]
251 pub fn channel(&self) -> &Channel {
252 &self.channel
253 }
254
255 pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
263 let req = QueryAccountRequest {
264 address: address.to_string(),
265 };
266 let resp = self
267 .auth
268 .account(req)
269 .await?
270 .into_inner()
271 .account
272 .ok_or_else(|| {
273 anyhow::anyhow!("Query account request failure, account should exist")
274 })?;
275
276 let account = BaseAccount::decode(&*resp.value)?;
277 Ok((account.account_number, account.sequence))
278 }
279
280 pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
287 let req = QueryAccountRequest {
288 address: address.to_string(),
289 };
290 let resp = self
291 .auth
292 .account(req)
293 .await?
294 .into_inner()
295 .account
296 .ok_or_else(|| {
297 anyhow::anyhow!("Query account request failure, account should exist")
298 })?;
299
300 Ok(BaseAccount::decode(&*resp.value)?)
301 }
302
303 pub async fn get_account_balances(
310 &mut self,
311 address: &str,
312 ) -> Result<Vec<Coin>, anyhow::Error> {
313 let req = QueryAllBalancesRequest {
314 address: address.to_string(),
315 resolve_denom: false,
316 pagination: None,
317 };
318 let balances = self.bank.all_balances(req).await?.into_inner().balances;
319 Ok(balances)
320 }
321
322 pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
328 let req = GetNodeInfoRequest {};
329 let info = self.base.get_node_info(req).await?.into_inner();
330 Ok(info)
331 }
332
333 pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
339 let req = GetLatestBlockRequest::default();
340 let latest_block = self
341 .base
342 .get_latest_block(req)
343 .await?
344 .into_inner()
345 .sdk_block
346 .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
347 Ok(latest_block)
348 }
349
350 pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
356 let latest_block = self.latest_block().await?;
357 let header = latest_block
358 .header
359 .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
360 let height = Height(header.height.try_into()?);
361 Ok(height)
362 }
363
364 pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
370 let req = QueryAllPerpetualsRequest { pagination: None };
371 let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
372 Ok(response.perpetual)
373 }
374
375 pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
381 let req = QueryAllClobPairRequest { pagination: None };
382 let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
383 Ok(pairs)
384 }
385
386 pub async fn get_subaccount(
392 &mut self,
393 address: &str,
394 number: u32,
395 ) -> Result<SubaccountInfo, anyhow::Error> {
396 let req = QueryGetSubaccountRequest {
397 owner: address.to_string(),
398 number,
399 };
400 let subaccount = self
401 .subaccounts
402 .subaccount(req)
403 .await?
404 .into_inner()
405 .subaccount
406 .ok_or_else(|| {
407 anyhow::anyhow!("Subaccount query response does not contain subaccount")
408 })?;
409 Ok(subaccount)
410 }
411
412 #[allow(deprecated)]
418 pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
419 let req = SimulateRequest { tx_bytes, tx: None };
420 let gas_used = self
421 .tx
422 .simulate(req)
423 .await?
424 .into_inner()
425 .gas_info
426 .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
427 .gas_used;
428 Ok(gas_used)
429 }
430
431 pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
437 let req = BroadcastTxRequest {
438 tx_bytes,
439 mode: BroadcastMode::Sync as i32,
440 };
441 let response = self.tx.broadcast_tx(req).await?.into_inner();
442
443 if let Some(tx_response) = response.tx_response {
444 if tx_response.code != 0 {
445 anyhow::bail!(
446 "Transaction broadcast failed: code={}, log={}",
447 tx_response.code,
448 tx_response.raw_log
449 );
450 }
451 Ok(tx_response.txhash)
452 } else {
453 Err(anyhow::anyhow!(
454 "Broadcast response does not contain tx_response"
455 ))
456 }
457 }
458
459 pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
465 let req = GetTxRequest {
466 hash: hash.to_string(),
467 };
468 let response = self.tx.get_tx(req).await?.into_inner();
469
470 if let Some(tx) = response.tx {
471 let tx_bytes = tx.encode_to_vec();
473 cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
474 } else {
475 anyhow::bail!("Transaction not found")
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use rstest::rstest;
483
484 use super::*;
485
486 #[rstest]
487 fn test_height_ordering() {
488 let h1 = Height(100);
489 let h2 = Height(200);
490 assert!(h1 < h2);
491 assert_eq!(h1, Height(100));
492 }
493
494 #[tokio::test]
495 async fn test_new_with_fallback_empty_urls() {
496 let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
497 assert!(result.is_err());
498 if let Err(DydxError::Config(msg)) = result {
499 assert_eq!(msg, "No gRPC URLs provided");
500 } else {
501 panic!("Expected Config error");
502 }
503 }
504
505 #[tokio::test]
506 async fn test_new_with_fallback_invalid_urls() {
507 let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
509 let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
510
511 assert!(result.is_err());
513 }
514}