nautilus_blockchain/rpc/
http.rs1use std::{collections::HashMap, num::NonZeroU32, str::FromStr};
17
18use alloy::primitives::{Address, U256};
19use bytes::Bytes;
20use nautilus_model::defi::rpc::{RpcLog, RpcNodeHttpResponse};
21use nautilus_network::{
22 http::{HttpClient, Method},
23 ratelimiter::quota::Quota,
24};
25use serde::de::DeserializeOwned;
26
27use crate::rpc::error::BlockchainRpcClientError;
28
29#[derive(Debug)]
34pub struct BlockchainHttpRpcClient {
35 http_rpc_url: String,
37 http_client: HttpClient,
39}
40
41impl BlockchainHttpRpcClient {
42 #[must_use]
48 pub fn new(http_rpc_url: String, rpc_request_per_second: Option<u32>) -> Self {
49 let default_quota = rpc_request_per_second.map(|rpc_request_per_second| {
50 Quota::per_second(NonZeroU32::new(rpc_request_per_second).unwrap())
51 });
52 let http_client = HttpClient::new(
53 HashMap::new(),
54 vec![],
55 Vec::new(),
56 default_quota,
57 None, None, )
60 .expect("Failed to create HTTP client");
61 Self {
62 http_rpc_url,
63 http_client,
64 }
65 }
66
67 async fn send_rpc_request(
69 &self,
70 rpc_request: serde_json::Value,
71 ) -> Result<Bytes, BlockchainRpcClientError> {
72 let body_bytes = serde_json::to_vec(&rpc_request).map_err(|e| {
73 BlockchainRpcClientError::ClientError(format!("Failed to serialize request: {e}"))
74 })?;
75
76 let mut headers = HashMap::new();
77 headers.insert("Content-Type".to_string(), "application/json".to_string());
78
79 match self
80 .http_client
81 .request(
82 Method::POST,
83 self.http_rpc_url.clone(),
84 None,
85 Some(headers),
86 Some(body_bytes),
87 None,
88 None,
89 )
90 .await
91 {
92 Ok(response) => Ok(response.body),
93 Err(e) => Err(BlockchainRpcClientError::ClientError(e.to_string())),
94 }
95 }
96
97 pub async fn execute_rpc_call<T: DeserializeOwned>(
103 &self,
104 rpc_request: serde_json::Value,
105 ) -> anyhow::Result<T> {
106 match self.send_rpc_request(rpc_request).await {
107 Ok(bytes) => match serde_json::from_slice::<RpcNodeHttpResponse<T>>(bytes.as_ref()) {
108 Ok(parsed) => {
109 if parsed.jsonrpc.is_none()
112 && let (Some(code), Some(message)) = (parsed.code, parsed.message)
113 {
114 anyhow::bail!("RPC provider error {code}: {message}");
115 }
116
117 if let Some(error) = parsed.error {
118 Err(anyhow::anyhow!(
119 "RPC error {}: {}",
120 error.code,
121 error.message
122 ))
123 } else if let Some(result) = parsed.result {
124 Ok(result)
125 } else {
126 Err(anyhow::anyhow!(
127 "Response missing both result and error fields"
128 ))
129 }
130 }
131 Err(e) => {
132 let raw_response = String::from_utf8_lossy(bytes.as_ref());
134 let preview = if raw_response.len() > 500 {
135 format!(
136 "{}... (truncated, {} bytes total)",
137 &raw_response[..500],
138 raw_response.len()
139 )
140 } else {
141 raw_response.to_string()
142 };
143
144 Err(anyhow::anyhow!(
145 "Failed to parse eth call response: {e}\nRaw response: {preview}"
146 ))
147 }
148 },
149 Err(e) => Err(anyhow::anyhow!(
150 "Failed to execute eth call RPC request: {e}"
151 )),
152 }
153 }
154
155 #[must_use]
157 pub fn construct_eth_call(
158 &self,
159 to: &str,
160 call_data: &[u8],
161 block: Option<u64>,
162 ) -> serde_json::Value {
163 let encoded_data = format!("0x{}", hex::encode(call_data));
164 let call = serde_json::json!({
165 "to": to,
166 "data": encoded_data
167 });
168
169 let block_param = if let Some(block_number) = block {
170 serde_json::json!(format!("0x{:x}", block_number))
171 } else {
172 serde_json::json!("latest")
173 };
174
175 serde_json::json!({
176 "jsonrpc": "2.0",
177 "id": 1,
178 "method": "eth_call",
179 "params": [call, block_param]
180 })
181 }
182
183 pub async fn get_balance(&self, address: &Address, block: Option<u64>) -> anyhow::Result<U256> {
189 let block_param = if let Some(block_number) = block {
190 serde_json::json!(format!("0x{:x}", block_number))
191 } else {
192 serde_json::json!("latest")
193 };
194
195 let request = serde_json::json!({
196 "jsonrpc": "2.0",
197 "id": 1,
198 "method": "eth_getBalance",
199 "params": [address, block_param]
200 });
201 let hex_string: String = self.execute_rpc_call(request).await?;
202
203 U256::from_str(&hex_string)
204 .map_err(|e| anyhow::anyhow!("Failed to parse balance hex string '{hex_string}': {e}"))
205 }
206
207 pub async fn get_logs(
216 &self,
217 address: Option<&Address>,
218 topics: Option<Vec<Option<String>>>,
219 from_block: u64,
220 to_block: u64,
221 ) -> anyhow::Result<Vec<RpcLog>> {
222 let mut filter = serde_json::Map::new();
223
224 filter.insert(
225 "fromBlock".to_string(),
226 serde_json::json!(format!("0x{:x}", from_block)),
227 );
228 filter.insert(
229 "toBlock".to_string(),
230 serde_json::json!(format!("0x{:x}", to_block)),
231 );
232
233 if let Some(addr) = address {
234 filter.insert(
235 "address".to_string(),
236 serde_json::json!(format!("{:?}", addr)),
237 );
238 }
239
240 if let Some(topics) = topics {
241 filter.insert("topics".to_string(), serde_json::json!(topics));
242 }
243
244 let request = serde_json::json!({
245 "jsonrpc": "2.0",
246 "id": 1,
247 "method": "eth_getLogs",
248 "params": [filter]
249 });
250
251 self.execute_rpc_call(request).await
252 }
253}