nautilus_blockchain/rpc/
http.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, num::NonZeroU32, str::FromStr};
17
18use alloy::primitives::{Address, U256};
19use bytes::Bytes;
20use nautilus_model::defi::rpc::{RpcLog, RpcNodeHttpResponse};
21use nautilus_network::{
22    http::{HttpClient, Method},
23    ratelimiter::quota::Quota,
24};
25use serde::de::DeserializeOwned;
26
27use crate::rpc::error::BlockchainRpcClientError;
28
29/// Client for making HTTP-based RPC requests to blockchain nodes.
30///
31/// This client is designed to interact with Ethereum-compatible blockchain networks, providing
32/// methods to execute RPC calls and handle responses in a type-safe manner.
33#[derive(Debug)]
34pub struct BlockchainHttpRpcClient {
35    /// The HTTP URL for the blockchain node's RPC endpoint.
36    http_rpc_url: String,
37    /// The HTTP client for making RPC http-based requests.
38    http_client: HttpClient,
39}
40
41impl BlockchainHttpRpcClient {
42    /// Creates a new HTTP RPC client with the given endpoint URL and optional rate limit.
43    ///
44    /// # Panics
45    ///
46    /// Panics if `rpc_request_per_second` is `Some(0)`, since a zero rate limit is invalid.
47    #[must_use]
48    pub fn new(http_rpc_url: String, rpc_request_per_second: Option<u32>) -> Self {
49        let default_quota = rpc_request_per_second.map(|rpc_request_per_second| {
50            Quota::per_second(NonZeroU32::new(rpc_request_per_second).unwrap())
51        });
52        let http_client = HttpClient::new(
53            HashMap::new(),
54            vec![],
55            Vec::new(),
56            default_quota,
57            None, // timeout_secs
58            None, // proxy_url
59        )
60        .expect("Failed to create HTTP client");
61        Self {
62            http_rpc_url,
63            http_client,
64        }
65    }
66
67    /// Generic method that sends a JSON-RPC request and returns the raw response in bytes.
68    async fn send_rpc_request(
69        &self,
70        rpc_request: serde_json::Value,
71    ) -> Result<Bytes, BlockchainRpcClientError> {
72        let body_bytes = serde_json::to_vec(&rpc_request).map_err(|e| {
73            BlockchainRpcClientError::ClientError(format!("Failed to serialize request: {e}"))
74        })?;
75
76        let mut headers = HashMap::new();
77        headers.insert("Content-Type".to_string(), "application/json".to_string());
78
79        match self
80            .http_client
81            .request(
82                Method::POST,
83                self.http_rpc_url.clone(),
84                None,
85                Some(headers),
86                Some(body_bytes),
87                None,
88                None,
89            )
90            .await
91        {
92            Ok(response) => Ok(response.body),
93            Err(e) => Err(BlockchainRpcClientError::ClientError(e.to_string())),
94        }
95    }
96
97    /// Executes an Ethereum JSON-RPC call and deserializes the response into the specified type T.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the HTTP RPC request fails or the response cannot be parsed.
102    pub async fn execute_rpc_call<T: DeserializeOwned>(
103        &self,
104        rpc_request: serde_json::Value,
105    ) -> anyhow::Result<T> {
106        match self.send_rpc_request(rpc_request).await {
107            Ok(bytes) => match serde_json::from_slice::<RpcNodeHttpResponse<T>>(bytes.as_ref()) {
108                Ok(parsed) => {
109                    // Check for non-standard rate limit error (e.g., Infura)
110                    // These responses have code/message at top level without jsonrpc field
111                    if parsed.jsonrpc.is_none()
112                        && let (Some(code), Some(message)) = (parsed.code, parsed.message)
113                    {
114                        anyhow::bail!("RPC provider error {code}: {message}");
115                    }
116
117                    if let Some(error) = parsed.error {
118                        Err(anyhow::anyhow!(
119                            "RPC error {}: {}",
120                            error.code,
121                            error.message
122                        ))
123                    } else if let Some(result) = parsed.result {
124                        Ok(result)
125                    } else {
126                        Err(anyhow::anyhow!(
127                            "Response missing both result and error fields"
128                        ))
129                    }
130                }
131                Err(e) => {
132                    // Try to convert bytes to string for better error reporting
133                    let raw_response = String::from_utf8_lossy(bytes.as_ref());
134                    let preview = if raw_response.len() > 500 {
135                        format!(
136                            "{}... (truncated, {} bytes total)",
137                            &raw_response[..500],
138                            raw_response.len()
139                        )
140                    } else {
141                        raw_response.to_string()
142                    };
143
144                    Err(anyhow::anyhow!(
145                        "Failed to parse eth call response: {e}\nRaw response: {preview}"
146                    ))
147                }
148            },
149            Err(e) => Err(anyhow::anyhow!(
150                "Failed to execute eth call RPC request: {e}"
151            )),
152        }
153    }
154
155    /// Creates a properly formatted `eth_call` JSON-RPC request object targeting a specific contract address with encoded function data.
156    #[must_use]
157    pub fn construct_eth_call(
158        &self,
159        to: &str,
160        call_data: &[u8],
161        block: Option<u64>,
162    ) -> serde_json::Value {
163        let encoded_data = format!("0x{}", hex::encode(call_data));
164        let call = serde_json::json!({
165            "to": to,
166            "data": encoded_data
167        });
168
169        let block_param = if let Some(block_number) = block {
170            serde_json::json!(format!("0x{:x}", block_number))
171        } else {
172            serde_json::json!("latest")
173        };
174
175        serde_json::json!({
176            "jsonrpc": "2.0",
177            "id": 1,
178            "method": "eth_call",
179            "params": [call, block_param]
180        })
181    }
182
183    /// Retrieves the balance of the specified Ethereum address at the given block.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the RPC call fails or if the returned balance string cannot be parsed as a valid U256.
188    pub async fn get_balance(&self, address: &Address, block: Option<u64>) -> anyhow::Result<U256> {
189        let block_param = if let Some(block_number) = block {
190            serde_json::json!(format!("0x{:x}", block_number))
191        } else {
192            serde_json::json!("latest")
193        };
194
195        let request = serde_json::json!({
196            "jsonrpc": "2.0",
197            "id": 1,
198            "method": "eth_getBalance",
199            "params": [address, block_param]
200        });
201        let hex_string: String = self.execute_rpc_call(request).await?;
202
203        U256::from_str(&hex_string)
204            .map_err(|e| anyhow::anyhow!("Failed to parse balance hex string '{hex_string}': {e}"))
205    }
206
207    /// Retrieves logs matching the given filter criteria.
208    ///
209    /// This method calls the `eth_getLogs` RPC method to fetch event logs from the blockchain.
210    /// It's commonly used for querying historical events like token transfers, swaps, etc.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the RPC call fails or the response cannot be parsed.
215    pub async fn get_logs(
216        &self,
217        address: Option<&Address>,
218        topics: Option<Vec<Option<String>>>,
219        from_block: u64,
220        to_block: u64,
221    ) -> anyhow::Result<Vec<RpcLog>> {
222        let mut filter = serde_json::Map::new();
223
224        filter.insert(
225            "fromBlock".to_string(),
226            serde_json::json!(format!("0x{:x}", from_block)),
227        );
228        filter.insert(
229            "toBlock".to_string(),
230            serde_json::json!(format!("0x{:x}", to_block)),
231        );
232
233        if let Some(addr) = address {
234            filter.insert(
235                "address".to_string(),
236                serde_json::json!(format!("{:?}", addr)),
237            );
238        }
239
240        if let Some(topics) = topics {
241            filter.insert("topics".to_string(), serde_json::json!(topics));
242        }
243
244        let request = serde_json::json!({
245            "jsonrpc": "2.0",
246            "id": 1,
247            "method": "eth_getLogs",
248            "params": [filter]
249        });
250
251        self.execute_rpc_call(request).await
252    }
253}