nautilus_blockchain/rpc/core.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25 error::BlockchainRpcClientError,
26 types::{BlockchainMessage, RpcEventType},
27 utils::{
28 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29 },
30};
31
32/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
33///
34/// It provides a shared implementation of common blockchain RPC functionality, handling:
35/// - WebSocket connection management with blockchain RPC node.
36/// - Subscription lifecycle (creation, tracking, and termination).
37/// - Message serialization and deserialization of RPC messages.
38/// - Event type mapping and dispatching.
39#[derive(Debug)]
40pub struct CoreBlockchainRpcClient {
41 /// The blockchain network type this client connects to.
42 chain: Chain,
43 /// WebSocket secure URL for the blockchain node's RPC endpoint.
44 wss_rpc_url: String,
45 /// Auto-incrementing counter for generating unique RPC request IDs.
46 request_id: u64,
47 /// Tracks in-flight subscription requests by mapping request IDs to their event types.
48 pending_subscription_request: HashMap<u64, RpcEventType>,
49 /// Maps active subscription IDs to their corresponding event types for message
50 /// deserialization.
51 subscription_event_types: HashMap<String, RpcEventType>,
52 /// The active WebSocket client connection.
53 wss_client: Option<Arc<WebSocketClient>>,
54 /// Channel receiver for consuming WebSocket messages.
55 wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
56}
57
58impl CoreBlockchainRpcClient {
59 #[must_use]
60 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
61 Self {
62 chain,
63 wss_rpc_url,
64 request_id: 1,
65 wss_client: None,
66 pending_subscription_request: HashMap::new(),
67 subscription_event_types: HashMap::new(),
68 wss_consumer_rx: None,
69 }
70 }
71
72 /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
73 ///
74 /// # Errors
75 ///
76 /// Returns an error if the WebSocket connection fails.
77 pub async fn connect(&mut self) -> anyhow::Result<()> {
78 let (handler, rx) = channel_message_handler();
79 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
80 // Most of the blockchain rpc nodes require a heartbeat to keep the connection alive
81 let heartbeat_interval = 30;
82 let config = WebSocketConfig {
83 url: self.wss_rpc_url.clone(),
84 headers: vec![user_agent],
85 message_handler: Some(handler),
86 heartbeat: Some(heartbeat_interval),
87 heartbeat_msg: None,
88 ping_handler: None,
89 reconnect_timeout_ms: Some(5_000),
90 reconnect_delay_initial_ms: None,
91 reconnect_delay_max_ms: None,
92 reconnect_backoff_factor: None,
93 reconnect_jitter_ms: None,
94 };
95 let client = WebSocketClient::connect(
96 config,
97 None, // post_reconnection
98 vec![], // keyed_quotas
99 None, // default_quota
100 )
101 .await?;
102
103 self.wss_client = Some(Arc::new(client));
104 self.wss_consumer_rx = Some(rx);
105
106 Ok(())
107 }
108
109 /// Registers a subscription for the specified event type and records it internally with the given ID.
110 async fn subscribe_events(
111 &mut self,
112 event_type: RpcEventType,
113 subscription_id: String,
114 ) -> Result<(), BlockchainRpcClientError> {
115 if let Some(client) = &self.wss_client {
116 log::info!("Subscribing to new blocks on chain '{}'", self.chain.name);
117 let msg = serde_json::json!({
118 "method": "eth_subscribe",
119 "id": self.request_id,
120 "jsonrpc": "2.0",
121 "params": [subscription_id]
122 });
123 self.pending_subscription_request
124 .insert(self.request_id, event_type);
125 self.request_id += 1;
126 if let Err(err) = client.send_text(msg.to_string(), None).await {
127 log::error!("Error sending subscribe message: {err:?}");
128 }
129 Ok(())
130 } else {
131 Err(BlockchainRpcClientError::ClientError(String::from(
132 "Client not connected",
133 )))
134 }
135 }
136
137 /// Terminates a subscription with the blockchain node using the provided subscription ID.
138 async fn unsubscribe_events(
139 &self,
140 subscription_id: String,
141 ) -> Result<(), BlockchainRpcClientError> {
142 if let Some(client) = &self.wss_client {
143 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
144 let msg = serde_json::json!({
145 "method": "eth_unsubscribe",
146 "id": 1,
147 "jsonrpc": "2.0",
148 "params": [subscription_id]
149 });
150 if let Err(err) = client.send_text(msg.to_string(), None).await {
151 log::error!("Error sending unsubscribe message: {err:?}");
152 }
153 Ok(())
154 } else {
155 Err(BlockchainRpcClientError::ClientError(String::from(
156 "Client not connected",
157 )))
158 }
159 }
160
161 /// Waits for and returns the next available message from the WebSocket channel.
162 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
163 match &mut self.wss_consumer_rx {
164 Some(rx) => rx.recv().await,
165 None => None,
166 }
167 }
168
169 /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
170 ///
171 /// # Panics
172 ///
173 /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
178 pub async fn next_rpc_message(
179 &mut self,
180 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
181 while let Some(msg) = self.wait_on_rpc_channel().await {
182 match msg {
183 Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
184 Ok(json) => {
185 if is_subscription_confirmation_response(&json) {
186 let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
187 let result = json.get("result").unwrap().as_str().unwrap();
188 let event_type = self
189 .pending_subscription_request
190 .get(&subscription_request_id)
191 .unwrap();
192 self.subscription_event_types
193 .insert(result.to_string(), event_type.clone());
194 self.pending_subscription_request
195 .remove(&subscription_request_id);
196 continue;
197 } else if is_subscription_event(&json) {
198 let subscription_id = match extract_rpc_subscription_id(&json) {
199 Some(id) => id,
200 None => {
201 return Err(BlockchainRpcClientError::InternalRpcClientError(
202 "Error parsing subscription id from valid rpc response"
203 .to_string(),
204 ));
205 }
206 };
207 if let Some(event_type) =
208 self.subscription_event_types.get(subscription_id)
209 {
210 match event_type {
211 RpcEventType::NewBlock => {
212 return match serde_json::from_value::<
213 RpcNodeWssResponse<Block>,
214 >(json)
215 {
216 Ok(block_response) => {
217 let block = block_response.params.result;
218 Ok(BlockchainMessage::Block(block))
219 }
220 Err(e) => {
221 Err(BlockchainRpcClientError::MessageParsingError(
222 format!(
223 "Error parsing rpc response to block with error {e}"
224 ),
225 ))
226 }
227 };
228 }
229 }
230 }
231 return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
232 "Event type not found for defined subscription id {subscription_id}"
233 )));
234 }
235 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
236 json.to_string(),
237 ));
238 }
239 Err(e) => {
240 return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
241 }
242 },
243 Message::Pong(_) => {
244 continue;
245 }
246 _ => {
247 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
248 msg.to_string(),
249 ));
250 }
251 }
252 }
253
254 Err(BlockchainRpcClientError::NoMessageReceived)
255 }
256
257 /// Subscribes to real-time block updates from the blockchain node.
258 ///
259 /// # Errors
260 ///
261 /// Returns an error if the subscription request fails or if the client is not connected.
262 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
263 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
264 .await
265 }
266
267 /// Cancels the subscription to real-time block updates.
268 ///
269 /// # Errors
270 ///
271 /// Returns an error if the unsubscription request fails or if the client is not connected.
272 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
273 self.unsubscribe_events(String::from("newHeads")).await?;
274
275 // Find and remove the subscription ID associated with the newBlock event type
276 let subscription_ids_to_remove: Vec<String> = self
277 .subscription_event_types
278 .iter()
279 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
280 .map(|(id, _)| id.clone())
281 .collect();
282
283 for id in subscription_ids_to_remove {
284 self.subscription_event_types.remove(&id);
285 }
286 Ok(())
287 }
288}