1use prost::Message as ProstMessage;
22use tonic::transport::Channel;
23
24use crate::{
25 error::DydxError,
26 proto::{
27 cosmos_sdk_proto::cosmos::{
28 auth::v1beta1::{
29 BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
30 },
31 bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
32 base::{
33 tendermint::v1beta1::{
34 Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
35 service_client::ServiceClient as BaseClient,
36 },
37 v1beta1::Coin,
38 },
39 tx::v1beta1::{
40 BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
41 service_client::ServiceClient as TxClient,
42 },
43 },
44 dydxprotocol::{
45 clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
46 perpetuals::{
47 Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
48 },
49 subaccounts::{
50 QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
51 query_client::QueryClient as SubaccountsClient,
52 },
53 },
54 },
55};
56
57pub type TxHash = String;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
62pub struct Height(pub u32);
63
64#[derive(Debug, Clone)]
72pub struct DydxGrpcClient {
73 channel: Channel,
74 auth: AuthClient<Channel>,
75 bank: BankClient<Channel>,
76 base: BaseClient<Channel>,
77 tx: TxClient<Channel>,
78 clob: ClobClient<Channel>,
79 perpetuals: PerpetualsClient<Channel>,
80 subaccounts: SubaccountsClient<Channel>,
81 current_url: String,
82}
83
84impl DydxGrpcClient {
85 pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
91 let mut endpoint = Channel::from_shared(grpc_url.clone())
92 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?;
93
94 if grpc_url.starts_with("https://") {
96 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
97 endpoint = endpoint
98 .tls_config(tls)
99 .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
100 }
101
102 let channel = endpoint.connect().await.map_err(|e| {
103 DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
104 "Connection failed: {e}"
105 ))))
106 })?;
107
108 Ok(Self {
109 auth: AuthClient::new(channel.clone()),
110 bank: BankClient::new(channel.clone()),
111 base: BaseClient::new(channel.clone()),
112 tx: TxClient::new(channel.clone()),
113 clob: ClobClient::new(channel.clone()),
114 perpetuals: PerpetualsClient::new(channel.clone()),
115 subaccounts: SubaccountsClient::new(channel.clone()),
116 channel,
117 current_url: grpc_url,
118 })
119 }
120
121 pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
131 if grpc_urls.is_empty() {
132 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
133 }
134
135 let mut last_error = None;
136
137 for (idx, url) in grpc_urls.iter().enumerate() {
138 let url_str = url.as_ref();
139 log::debug!(
140 "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
141 idx + 1,
142 grpc_urls.len()
143 );
144
145 match Self::new(url_str.to_string()).await {
146 Ok(client) => {
147 log::info!("Successfully connected to gRPC node: {url_str}");
148 return Ok(client);
149 }
150 Err(e) => {
151 log::warn!("Failed to connect to gRPC node {url_str}: {e}");
152 last_error = Some(e);
153 }
154 }
155 }
156
157 Err(last_error.unwrap_or_else(|| {
158 DydxError::Grpc(Box::new(tonic::Status::unavailable(
159 "All gRPC connection attempts failed".to_string(),
160 )))
161 }))
162 }
163
164 pub async fn reconnect_with_fallback(
174 &mut self,
175 grpc_urls: &[impl AsRef<str>],
176 ) -> Result<(), DydxError> {
177 if grpc_urls.is_empty() {
178 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
179 }
180
181 let mut last_error = None;
182
183 for (idx, url) in grpc_urls.iter().enumerate() {
184 let url_str = url.as_ref();
185
186 if url_str == self.current_url {
188 log::debug!("Skipping current URL: {url_str}");
189 continue;
190 }
191
192 log::debug!(
193 "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
194 idx + 1,
195 grpc_urls.len()
196 );
197
198 let mut endpoint = match Channel::from_shared(url_str.to_string())
199 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
200 {
201 Ok(ep) => ep,
202 Err(e) => {
203 last_error = Some(e);
204 continue;
205 }
206 };
207
208 if url_str.starts_with("https://") {
210 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
211 endpoint = match endpoint.tls_config(tls) {
212 Ok(ep) => ep,
213 Err(e) => {
214 last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
215 continue;
216 }
217 };
218 }
219
220 match endpoint.connect().await {
221 Ok(connected_channel) => {
222 log::info!("Successfully reconnected to gRPC node: {url_str}");
223
224 self.channel = connected_channel.clone();
226 self.auth = AuthClient::new(connected_channel.clone());
227 self.bank = BankClient::new(connected_channel.clone());
228 self.base = BaseClient::new(connected_channel.clone());
229 self.tx = TxClient::new(connected_channel.clone());
230 self.clob = ClobClient::new(connected_channel.clone());
231 self.perpetuals = PerpetualsClient::new(connected_channel.clone());
232 self.subaccounts = SubaccountsClient::new(connected_channel);
233 self.current_url = url_str.to_string();
234
235 return Ok(());
236 }
237 Err(e) => {
238 log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
239 last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
240 format!("Connection failed: {e}"),
241 ))));
242 }
243 }
244 }
245
246 Err(last_error.unwrap_or_else(|| {
247 DydxError::Grpc(Box::new(tonic::Status::unavailable(
248 "All gRPC reconnection attempts failed".to_string(),
249 )))
250 }))
251 }
252
253 #[must_use]
255 pub fn current_url(&self) -> &str {
256 &self.current_url
257 }
258
259 #[must_use]
263 pub fn channel(&self) -> &Channel {
264 &self.channel
265 }
266
267 pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
275 let req = QueryAccountRequest {
276 address: address.to_string(),
277 };
278 let resp = self
279 .auth
280 .account(req)
281 .await?
282 .into_inner()
283 .account
284 .ok_or_else(|| {
285 anyhow::anyhow!("Query account request failure, account should exist")
286 })?;
287
288 let account = BaseAccount::decode(&*resp.value)?;
289 Ok((account.account_number, account.sequence))
290 }
291
292 pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
299 let req = QueryAccountRequest {
300 address: address.to_string(),
301 };
302 let resp = self
303 .auth
304 .account(req)
305 .await?
306 .into_inner()
307 .account
308 .ok_or_else(|| {
309 anyhow::anyhow!("Query account request failure, account should exist")
310 })?;
311
312 Ok(BaseAccount::decode(&*resp.value)?)
313 }
314
315 pub async fn get_account_balances(
322 &mut self,
323 address: &str,
324 ) -> Result<Vec<Coin>, anyhow::Error> {
325 let req = QueryAllBalancesRequest {
326 address: address.to_string(),
327 resolve_denom: false,
328 pagination: None,
329 };
330 let balances = self.bank.all_balances(req).await?.into_inner().balances;
331 Ok(balances)
332 }
333
334 pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
340 let req = GetNodeInfoRequest {};
341 let info = self.base.get_node_info(req).await?.into_inner();
342 Ok(info)
343 }
344
345 pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
351 let req = GetLatestBlockRequest::default();
352 let latest_block = self
353 .base
354 .get_latest_block(req)
355 .await?
356 .into_inner()
357 .sdk_block
358 .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
359 Ok(latest_block)
360 }
361
362 pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
368 let latest_block = self.latest_block().await?;
369 let header = latest_block
370 .header
371 .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
372 let height = Height(header.height.try_into()?);
373 Ok(height)
374 }
375
376 pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
382 let req = QueryAllPerpetualsRequest { pagination: None };
383 let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
384 Ok(response.perpetual)
385 }
386
387 pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
393 let req = QueryAllClobPairRequest { pagination: None };
394 let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
395 Ok(pairs)
396 }
397
398 pub async fn get_subaccount(
404 &mut self,
405 address: &str,
406 number: u32,
407 ) -> Result<SubaccountInfo, anyhow::Error> {
408 let req = QueryGetSubaccountRequest {
409 owner: address.to_string(),
410 number,
411 };
412 let subaccount = self
413 .subaccounts
414 .subaccount(req)
415 .await?
416 .into_inner()
417 .subaccount
418 .ok_or_else(|| {
419 anyhow::anyhow!("Subaccount query response does not contain subaccount")
420 })?;
421 Ok(subaccount)
422 }
423
424 #[allow(deprecated)]
430 pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
431 let req = SimulateRequest { tx_bytes, tx: None };
432 let gas_used = self
433 .tx
434 .simulate(req)
435 .await?
436 .into_inner()
437 .gas_info
438 .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
439 .gas_used;
440 Ok(gas_used)
441 }
442
443 pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
449 let req = BroadcastTxRequest {
450 tx_bytes,
451 mode: BroadcastMode::Sync as i32,
452 };
453 let response = self.tx.broadcast_tx(req).await?.into_inner();
454
455 if let Some(tx_response) = response.tx_response {
456 if tx_response.code != 0 {
457 anyhow::bail!(
458 "Transaction broadcast failed: code={}, log={}",
459 tx_response.code,
460 tx_response.raw_log
461 );
462 }
463 Ok(tx_response.txhash)
464 } else {
465 Err(anyhow::anyhow!(
466 "Broadcast response does not contain tx_response"
467 ))
468 }
469 }
470
471 pub async fn get_tx(&mut self, hash: &str) -> Result<cosmrs::Tx, anyhow::Error> {
477 let req = GetTxRequest {
478 hash: hash.to_string(),
479 };
480 let response = self.tx.get_tx(req).await?.into_inner();
481
482 if let Some(tx) = response.tx {
483 let tx_bytes = tx.encode_to_vec();
485 cosmrs::Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
486 } else {
487 anyhow::bail!("Transaction not found")
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use rstest::rstest;
495
496 use super::*;
497
498 #[rstest]
499 fn test_height_ordering() {
500 let h1 = Height(100);
501 let h2 = Height(200);
502 assert!(h1 < h2);
503 assert_eq!(h1, Height(100));
504 }
505
506 #[tokio::test]
507 async fn test_new_with_fallback_empty_urls() {
508 let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
509 assert!(result.is_err());
510 if let Err(DydxError::Config(msg)) = result {
511 assert_eq!(msg, "No gRPC URLs provided");
512 } else {
513 panic!("Expected Config error");
514 }
515 }
516
517 #[tokio::test]
518 async fn test_new_with_fallback_invalid_urls() {
519 let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
521 let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
522
523 assert!(result.is_err());
525 }
526}