1use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21 RECONNECTED,
22 websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
23};
24use reqwest::header::USER_AGENT;
25use tokio::sync::RwLock;
26use tokio_tungstenite::tungstenite::Message;
27
28use crate::rpc::{
29 error::BlockchainRpcClientError,
30 types::{BlockchainMessage, RpcEventType},
31 utils::{
32 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
33 },
34};
35
36pub struct CoreBlockchainRpcClient {
45 chain: Chain,
47 wss_rpc_url: String,
49 request_id: u64,
51 pending_subscription_request: HashMap<u64, RpcEventType>,
53 subscription_event_types: HashMap<String, RpcEventType>,
56 wss_client: Option<Arc<WebSocketClient>>,
58 wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
60 subscriptions: Arc<RwLock<HashMap<RpcEventType, String>>>,
62}
63
64impl Debug for CoreBlockchainRpcClient {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("CoreBlockchainRpcClient")
67 .field("chain", &self.chain)
68 .field("wss_rpc_url", &self.wss_rpc_url)
69 .field("request_id", &self.request_id)
70 .field(
71 "pending_subscription_request",
72 &self.pending_subscription_request,
73 )
74 .field("subscription_event_types", &self.subscription_event_types)
75 .field(
76 "wss_client",
77 &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
78 )
79 .field(
80 "wss_consumer_rx",
81 &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
82 )
83 .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
84 .finish()
85 }
86}
87
88impl CoreBlockchainRpcClient {
89 #[must_use]
90 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
91 Self {
92 chain,
93 wss_rpc_url,
94 request_id: 1,
95 wss_client: None,
96 pending_subscription_request: HashMap::new(),
97 subscription_event_types: HashMap::new(),
98 wss_consumer_rx: None,
99 subscriptions: Arc::new(RwLock::new(HashMap::new())),
100 }
101 }
102
103 pub async fn connect(&mut self) -> anyhow::Result<()> {
112 let (handler, rx) = channel_message_handler();
113 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
114
115 let heartbeat_interval = 30;
117
118 let config = WebSocketConfig {
119 url: self.wss_rpc_url.clone(),
120 headers: vec![user_agent],
121 message_handler: Some(handler),
122 heartbeat: Some(heartbeat_interval),
123 heartbeat_msg: None,
124 ping_handler: None,
125 reconnect_timeout_ms: Some(10_000),
126 reconnect_delay_initial_ms: Some(1_000),
127 reconnect_delay_max_ms: Some(30_000),
128 reconnect_backoff_factor: Some(2.0),
129 reconnect_jitter_ms: Some(1_000),
130 reconnect_max_attempts: None,
131 };
132
133 let client = WebSocketClient::connect(config, None, vec![], None).await?;
134
135 self.wss_client = Some(Arc::new(client));
136 self.wss_consumer_rx = Some(rx);
137
138 Ok(())
139 }
140
141 async fn subscribe_events(
143 &mut self,
144 event_type: RpcEventType,
145 subscription_id: String,
146 ) -> Result<(), BlockchainRpcClientError> {
147 if let Some(client) = &self.wss_client {
148 log::info!(
149 "Subscribing to '{}' on chain '{}'",
150 subscription_id,
151 self.chain.name
152 );
153 let msg = serde_json::json!({
154 "method": "eth_subscribe",
155 "id": self.request_id,
156 "jsonrpc": "2.0",
157 "params": [subscription_id]
158 });
159 self.pending_subscription_request
160 .insert(self.request_id, event_type.clone());
161 self.request_id += 1;
162 if let Err(e) = client.send_text(msg.to_string(), None).await {
163 log::error!("Error sending subscribe message: {e:?}");
164 }
165
166 let mut confirmed = self.subscriptions.write().await;
168 confirmed.insert(event_type, subscription_id);
169
170 Ok(())
171 } else {
172 Err(BlockchainRpcClientError::ClientError(String::from(
173 "Client not connected",
174 )))
175 }
176 }
177
178 async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
180 let subscriptions = self.subscriptions.read().await;
181
182 if subscriptions.is_empty() {
183 log::debug!(
184 "No subscriptions to re-establish for chain '{}'",
185 self.chain.name
186 );
187 return Ok(());
188 }
189
190 log::info!(
191 "Re-establishing {} subscription(s) for chain '{}'",
192 subscriptions.len(),
193 self.chain.name
194 );
195
196 let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
197 .iter()
198 .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
199 .collect();
200
201 drop(subscriptions);
202
203 for (event_type, subscription_id) in subs_to_restore {
204 if let Some(client) = &self.wss_client {
205 log::debug!(
206 "Re-subscribing to '{}' on chain '{}'",
207 subscription_id,
208 self.chain.name
209 );
210
211 let msg = serde_json::json!({
212 "method": "eth_subscribe",
213 "id": self.request_id,
214 "jsonrpc": "2.0",
215 "params": [subscription_id]
216 });
217
218 self.pending_subscription_request
219 .insert(self.request_id, event_type);
220 self.request_id += 1;
221
222 if let Err(e) = client.send_text(msg.to_string(), None).await {
223 log::error!("Error re-subscribing after reconnection: {e:?}");
224 }
225 }
226 }
227
228 Ok(())
229 }
230
231 async fn unsubscribe_events(
233 &self,
234 subscription_id: String,
235 ) -> Result<(), BlockchainRpcClientError> {
236 if let Some(client) = &self.wss_client {
237 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
238 let msg = serde_json::json!({
239 "method": "eth_unsubscribe",
240 "id": 1,
241 "jsonrpc": "2.0",
242 "params": [subscription_id]
243 });
244 if let Err(e) = client.send_text(msg.to_string(), None).await {
245 log::error!("Error sending unsubscribe message: {e:?}");
246 }
247 Ok(())
248 } else {
249 Err(BlockchainRpcClientError::ClientError(String::from(
250 "Client not connected",
251 )))
252 }
253 }
254
255 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
257 match &mut self.wss_consumer_rx {
258 Some(rx) => rx.recv().await,
259 None => None,
260 }
261 }
262
263 pub async fn next_rpc_message(
275 &mut self,
276 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
277 while let Some(msg) = self.wait_on_rpc_channel().await {
278 match msg {
279 Message::Text(text) => {
280 if text == RECONNECTED {
281 log::info!("Detected reconnection for chain '{}'", self.chain.name);
282 if let Err(e) = self.resubscribe_all().await {
283 log::error!("Failed to re-establish subscriptions: {e:?}");
284 }
285 continue;
286 }
287
288 match serde_json::from_str::<serde_json::Value>(&text) {
289 Ok(json) => {
290 if is_subscription_confirmation_response(&json) {
291 let subscription_request_id =
292 json.get("id").unwrap().as_u64().unwrap();
293 let result = json.get("result").unwrap().as_str().unwrap();
294 let event_type = self
295 .pending_subscription_request
296 .get(&subscription_request_id)
297 .unwrap();
298 self.subscription_event_types
299 .insert(result.to_string(), event_type.clone());
300 self.pending_subscription_request
301 .remove(&subscription_request_id);
302 continue;
303 } else if is_subscription_event(&json) {
304 let subscription_id = match extract_rpc_subscription_id(&json) {
305 Some(id) => id,
306 None => {
307 return Err(BlockchainRpcClientError::InternalRpcClientError(
308 "Error parsing subscription id from valid rpc response"
309 .to_string(),
310 ));
311 }
312 };
313 if let Some(event_type) =
314 self.subscription_event_types.get(subscription_id)
315 {
316 match event_type {
317 RpcEventType::NewBlock => {
318 return match serde_json::from_value::<
319 RpcNodeWssResponse<Block>,
320 >(
321 json
322 ) {
323 Ok(block_response) => {
324 let block = block_response.params.result;
325 Ok(BlockchainMessage::Block(block))
326 }
327 Err(e) => Err(
328 BlockchainRpcClientError::MessageParsingError(
329 format!(
330 "Error parsing rpc response to block with error {e}"
331 ),
332 ),
333 ),
334 };
335 }
336 }
337 }
338 return Err(BlockchainRpcClientError::InternalRpcClientError(
339 format!(
340 "Event type not found for defined subscription id {subscription_id}"
341 ),
342 ));
343 }
344 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
345 json.to_string(),
346 ));
347 }
348 Err(e) => {
349 return Err(BlockchainRpcClientError::MessageParsingError(
350 e.to_string(),
351 ));
352 }
353 }
354 }
355 Message::Pong(_) => {
356 continue;
357 }
358 _ => {
359 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
360 msg.to_string(),
361 ));
362 }
363 }
364 }
365
366 Err(BlockchainRpcClientError::NoMessageReceived)
367 }
368
369 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
375 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
376 .await
377 }
378
379 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
385 self.unsubscribe_events(String::from("newHeads")).await?;
386
387 let subscription_ids_to_remove: Vec<String> = self
388 .subscription_event_types
389 .iter()
390 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
391 .map(|(id, _)| id.clone())
392 .collect();
393
394 for id in subscription_ids_to_remove {
395 self.subscription_event_types.remove(&id);
396 }
397
398 let mut confirmed = self.subscriptions.write().await;
399 confirmed.remove(&RpcEventType::NewBlock);
400
401 Ok(())
402 }
403}