1use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21 RECONNECTED,
22 websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
23};
24use reqwest::header::USER_AGENT;
25use tokio::sync::RwLock;
26use tokio_tungstenite::tungstenite::Message;
27
28use crate::rpc::{
29 error::BlockchainRpcClientError,
30 types::{BlockchainMessage, RpcEventType},
31 utils::{
32 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
33 },
34};
35
36pub struct CoreBlockchainRpcClient {
45 chain: Chain,
47 wss_rpc_url: String,
49 request_id: u64,
51 pending_subscription_request: HashMap<u64, RpcEventType>,
53 subscription_event_types: HashMap<String, RpcEventType>,
56 wss_client: Option<Arc<WebSocketClient>>,
58 wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
60 subscriptions: Arc<RwLock<HashMap<RpcEventType, String>>>,
62}
63
64impl Debug for CoreBlockchainRpcClient {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("CoreBlockchainRpcClient")
67 .field("chain", &self.chain)
68 .field("wss_rpc_url", &self.wss_rpc_url)
69 .field("request_id", &self.request_id)
70 .field(
71 "pending_subscription_request",
72 &self.pending_subscription_request,
73 )
74 .field("subscription_event_types", &self.subscription_event_types)
75 .field(
76 "wss_client",
77 &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
78 )
79 .field(
80 "wss_consumer_rx",
81 &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
82 )
83 .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
84 .finish()
85 }
86}
87
88impl CoreBlockchainRpcClient {
89 #[must_use]
90 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
91 Self {
92 chain,
93 wss_rpc_url,
94 request_id: 1,
95 wss_client: None,
96 pending_subscription_request: HashMap::new(),
97 subscription_event_types: HashMap::new(),
98 wss_consumer_rx: None,
99 subscriptions: Arc::new(RwLock::new(HashMap::new())),
100 }
101 }
102
103 pub async fn connect(&mut self) -> anyhow::Result<()> {
112 let (handler, rx) = channel_message_handler();
113 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
114
115 let heartbeat_interval = 30;
117
118 let config = WebSocketConfig {
119 url: self.wss_rpc_url.clone(),
120 headers: vec![user_agent],
121 message_handler: Some(handler),
122 heartbeat: Some(heartbeat_interval),
123 heartbeat_msg: None,
124 ping_handler: None,
125 reconnect_timeout_ms: Some(10_000),
126 reconnect_delay_initial_ms: Some(1_000),
127 reconnect_delay_max_ms: Some(30_000),
128 reconnect_backoff_factor: Some(2.0),
129 reconnect_jitter_ms: Some(1_000),
130 };
131
132 let client = WebSocketClient::connect(config, None, vec![], None).await?;
133
134 self.wss_client = Some(Arc::new(client));
135 self.wss_consumer_rx = Some(rx);
136
137 Ok(())
138 }
139
140 async fn subscribe_events(
142 &mut self,
143 event_type: RpcEventType,
144 subscription_id: String,
145 ) -> Result<(), BlockchainRpcClientError> {
146 if let Some(client) = &self.wss_client {
147 log::info!(
148 "Subscribing to '{}' on chain '{}'",
149 subscription_id,
150 self.chain.name
151 );
152 let msg = serde_json::json!({
153 "method": "eth_subscribe",
154 "id": self.request_id,
155 "jsonrpc": "2.0",
156 "params": [subscription_id]
157 });
158 self.pending_subscription_request
159 .insert(self.request_id, event_type.clone());
160 self.request_id += 1;
161 if let Err(e) = client.send_text(msg.to_string(), None).await {
162 log::error!("Error sending subscribe message: {e:?}");
163 }
164
165 let mut confirmed = self.subscriptions.write().await;
167 confirmed.insert(event_type, subscription_id);
168
169 Ok(())
170 } else {
171 Err(BlockchainRpcClientError::ClientError(String::from(
172 "Client not connected",
173 )))
174 }
175 }
176
177 async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
179 let subscriptions = self.subscriptions.read().await;
180
181 if subscriptions.is_empty() {
182 log::debug!(
183 "No subscriptions to re-establish for chain '{}'",
184 self.chain.name
185 );
186 return Ok(());
187 }
188
189 log::info!(
190 "Re-establishing {} subscription(s) for chain '{}'",
191 subscriptions.len(),
192 self.chain.name
193 );
194
195 let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
196 .iter()
197 .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
198 .collect();
199
200 drop(subscriptions);
201
202 for (event_type, subscription_id) in subs_to_restore {
203 if let Some(client) = &self.wss_client {
204 log::debug!(
205 "Re-subscribing to '{}' on chain '{}'",
206 subscription_id,
207 self.chain.name
208 );
209
210 let msg = serde_json::json!({
211 "method": "eth_subscribe",
212 "id": self.request_id,
213 "jsonrpc": "2.0",
214 "params": [subscription_id]
215 });
216
217 self.pending_subscription_request
218 .insert(self.request_id, event_type);
219 self.request_id += 1;
220
221 if let Err(e) = client.send_text(msg.to_string(), None).await {
222 log::error!("Error re-subscribing after reconnection: {e:?}");
223 }
224 }
225 }
226
227 Ok(())
228 }
229
230 async fn unsubscribe_events(
232 &self,
233 subscription_id: String,
234 ) -> Result<(), BlockchainRpcClientError> {
235 if let Some(client) = &self.wss_client {
236 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
237 let msg = serde_json::json!({
238 "method": "eth_unsubscribe",
239 "id": 1,
240 "jsonrpc": "2.0",
241 "params": [subscription_id]
242 });
243 if let Err(e) = client.send_text(msg.to_string(), None).await {
244 log::error!("Error sending unsubscribe message: {e:?}");
245 }
246 Ok(())
247 } else {
248 Err(BlockchainRpcClientError::ClientError(String::from(
249 "Client not connected",
250 )))
251 }
252 }
253
254 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
256 match &mut self.wss_consumer_rx {
257 Some(rx) => rx.recv().await,
258 None => None,
259 }
260 }
261
262 pub async fn next_rpc_message(
274 &mut self,
275 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
276 while let Some(msg) = self.wait_on_rpc_channel().await {
277 match msg {
278 Message::Text(text) => {
279 if text == RECONNECTED {
280 log::info!("Detected reconnection for chain '{}'", self.chain.name);
281 if let Err(e) = self.resubscribe_all().await {
282 log::error!("Failed to re-establish subscriptions: {e:?}");
283 }
284 continue;
285 }
286
287 match serde_json::from_str::<serde_json::Value>(&text) {
288 Ok(json) => {
289 if is_subscription_confirmation_response(&json) {
290 let subscription_request_id =
291 json.get("id").unwrap().as_u64().unwrap();
292 let result = json.get("result").unwrap().as_str().unwrap();
293 let event_type = self
294 .pending_subscription_request
295 .get(&subscription_request_id)
296 .unwrap();
297 self.subscription_event_types
298 .insert(result.to_string(), event_type.clone());
299 self.pending_subscription_request
300 .remove(&subscription_request_id);
301 continue;
302 } else if is_subscription_event(&json) {
303 let subscription_id = match extract_rpc_subscription_id(&json) {
304 Some(id) => id,
305 None => {
306 return Err(BlockchainRpcClientError::InternalRpcClientError(
307 "Error parsing subscription id from valid rpc response"
308 .to_string(),
309 ));
310 }
311 };
312 if let Some(event_type) =
313 self.subscription_event_types.get(subscription_id)
314 {
315 match event_type {
316 RpcEventType::NewBlock => {
317 return match serde_json::from_value::<
318 RpcNodeWssResponse<Block>,
319 >(
320 json
321 ) {
322 Ok(block_response) => {
323 let block = block_response.params.result;
324 Ok(BlockchainMessage::Block(block))
325 }
326 Err(e) => Err(
327 BlockchainRpcClientError::MessageParsingError(
328 format!(
329 "Error parsing rpc response to block with error {e}"
330 ),
331 ),
332 ),
333 };
334 }
335 }
336 }
337 return Err(BlockchainRpcClientError::InternalRpcClientError(
338 format!(
339 "Event type not found for defined subscription id {subscription_id}"
340 ),
341 ));
342 }
343 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
344 json.to_string(),
345 ));
346 }
347 Err(e) => {
348 return Err(BlockchainRpcClientError::MessageParsingError(
349 e.to_string(),
350 ));
351 }
352 }
353 }
354 Message::Pong(_) => {
355 continue;
356 }
357 _ => {
358 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
359 msg.to_string(),
360 ));
361 }
362 }
363 }
364
365 Err(BlockchainRpcClientError::NoMessageReceived)
366 }
367
368 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
374 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
375 .await
376 }
377
378 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
384 self.unsubscribe_events(String::from("newHeads")).await?;
385
386 let subscription_ids_to_remove: Vec<String> = self
387 .subscription_event_types
388 .iter()
389 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
390 .map(|(id, _)| id.clone())
391 .collect();
392
393 for id in subscription_ids_to_remove {
394 self.subscription_event_types.remove(&id);
395 }
396
397 let mut confirmed = self.subscriptions.write().await;
398 confirmed.remove(&RpcEventType::NewBlock);
399
400 Ok(())
401 }
402}