1use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21 RECONNECTED,
22 http::USER_AGENT,
23 websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
24};
25use tokio_tungstenite::tungstenite::Message;
26
27use crate::rpc::{
28 error::BlockchainRpcClientError,
29 types::{BlockchainMessage, RpcEventType},
30 utils::{
31 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
32 },
33};
34
35pub struct CoreBlockchainRpcClient {
44 chain: Chain,
46 wss_rpc_url: String,
48 request_id: u64,
50 pending_subscription_request: HashMap<u64, RpcEventType>,
52 subscription_event_types: HashMap<String, RpcEventType>,
55 wss_client: Option<Arc<WebSocketClient>>,
57 wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
59 subscriptions: Arc<tokio::sync::RwLock<HashMap<RpcEventType, String>>>,
61}
62
63impl Debug for CoreBlockchainRpcClient {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct(stringify!(CoreBlockchainRpcClient))
66 .field("chain", &self.chain)
67 .field("wss_rpc_url", &self.wss_rpc_url)
68 .field("request_id", &self.request_id)
69 .field(
70 "pending_subscription_request",
71 &self.pending_subscription_request,
72 )
73 .field("subscription_event_types", &self.subscription_event_types)
74 .field(
75 "wss_client",
76 &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
77 )
78 .field(
79 "wss_consumer_rx",
80 &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
81 )
82 .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
83 .finish()
84 }
85}
86
87impl CoreBlockchainRpcClient {
88 #[must_use]
89 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
90 Self {
91 chain,
92 wss_rpc_url,
93 request_id: 1,
94 wss_client: None,
95 pending_subscription_request: HashMap::new(),
96 subscription_event_types: HashMap::new(),
97 wss_consumer_rx: None,
98 subscriptions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
99 }
100 }
101
102 pub async fn connect(&mut self) -> anyhow::Result<()> {
111 let (handler, rx) = channel_message_handler();
112 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
113
114 let heartbeat_interval = 30;
116
117 let config = WebSocketConfig {
118 url: self.wss_rpc_url.clone(),
119 headers: vec![user_agent],
120 heartbeat: Some(heartbeat_interval),
121 heartbeat_msg: None,
122 reconnect_timeout_ms: Some(10_000),
123 reconnect_delay_initial_ms: Some(1_000),
124 reconnect_delay_max_ms: Some(30_000),
125 reconnect_backoff_factor: Some(2.0),
126 reconnect_jitter_ms: Some(1_000),
127 reconnect_max_attempts: None,
128 };
129
130 let client =
131 WebSocketClient::connect(config, Some(handler), None, None, vec![], None).await?;
132
133 self.wss_client = Some(Arc::new(client));
134 self.wss_consumer_rx = Some(rx);
135
136 Ok(())
137 }
138
139 async fn subscribe_events(
141 &mut self,
142 event_type: RpcEventType,
143 subscription_id: String,
144 ) -> Result<(), BlockchainRpcClientError> {
145 if let Some(client) = &self.wss_client {
146 log::info!(
147 "Subscribing to '{}' on chain '{}'",
148 subscription_id,
149 self.chain.name
150 );
151 let msg = serde_json::json!({
152 "method": "eth_subscribe",
153 "id": self.request_id,
154 "jsonrpc": "2.0",
155 "params": [subscription_id]
156 });
157 self.pending_subscription_request
158 .insert(self.request_id, event_type.clone());
159 self.request_id += 1;
160 if let Err(e) = client.send_text(msg.to_string(), None).await {
161 log::error!("Error sending subscribe message: {e:?}");
162 }
163
164 let mut confirmed = self.subscriptions.write().await;
166 confirmed.insert(event_type, subscription_id);
167
168 Ok(())
169 } else {
170 Err(BlockchainRpcClientError::ClientError(String::from(
171 "Client not connected",
172 )))
173 }
174 }
175
176 async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
178 let subscriptions = self.subscriptions.read().await;
179
180 if subscriptions.is_empty() {
181 log::debug!(
182 "No subscriptions to re-establish for chain '{}'",
183 self.chain.name
184 );
185 return Ok(());
186 }
187
188 log::info!(
189 "Re-establishing {} subscription(s) for chain '{}'",
190 subscriptions.len(),
191 self.chain.name
192 );
193
194 let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
195 .iter()
196 .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
197 .collect();
198
199 drop(subscriptions);
200
201 for (event_type, subscription_id) in subs_to_restore {
202 if let Some(client) = &self.wss_client {
203 log::debug!(
204 "Re-subscribing to '{}' on chain '{}'",
205 subscription_id,
206 self.chain.name
207 );
208
209 let msg = serde_json::json!({
210 "method": "eth_subscribe",
211 "id": self.request_id,
212 "jsonrpc": "2.0",
213 "params": [subscription_id]
214 });
215
216 self.pending_subscription_request
217 .insert(self.request_id, event_type);
218 self.request_id += 1;
219
220 if let Err(e) = client.send_text(msg.to_string(), None).await {
221 log::error!("Error re-subscribing after reconnection: {e:?}");
222 }
223 }
224 }
225
226 Ok(())
227 }
228
229 async fn unsubscribe_events(
231 &self,
232 subscription_id: String,
233 ) -> Result<(), BlockchainRpcClientError> {
234 if let Some(client) = &self.wss_client {
235 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
236 let msg = serde_json::json!({
237 "method": "eth_unsubscribe",
238 "id": 1,
239 "jsonrpc": "2.0",
240 "params": [subscription_id]
241 });
242 if let Err(e) = client.send_text(msg.to_string(), None).await {
243 log::error!("Error sending unsubscribe message: {e:?}");
244 }
245 Ok(())
246 } else {
247 Err(BlockchainRpcClientError::ClientError(String::from(
248 "Client not connected",
249 )))
250 }
251 }
252
253 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
255 match &mut self.wss_consumer_rx {
256 Some(rx) => rx.recv().await,
257 None => None,
258 }
259 }
260
261 pub async fn next_rpc_message(
273 &mut self,
274 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
275 while let Some(msg) = self.wait_on_rpc_channel().await {
276 match msg {
277 Message::Text(text) => {
278 if text == RECONNECTED {
279 log::info!("Detected reconnection for chain '{}'", self.chain.name);
280 if let Err(e) = self.resubscribe_all().await {
281 log::error!("Failed to re-establish subscriptions: {e:?}");
282 }
283 continue;
284 }
285
286 match serde_json::from_str::<serde_json::Value>(&text) {
287 Ok(json) => {
288 if is_subscription_confirmation_response(&json) {
289 let subscription_request_id =
290 json.get("id").unwrap().as_u64().unwrap();
291 let result = json.get("result").unwrap().as_str().unwrap();
292 let event_type = self
293 .pending_subscription_request
294 .get(&subscription_request_id)
295 .unwrap();
296 self.subscription_event_types
297 .insert(result.to_string(), event_type.clone());
298 self.pending_subscription_request
299 .remove(&subscription_request_id);
300 continue;
301 } else if is_subscription_event(&json) {
302 let subscription_id = match extract_rpc_subscription_id(&json) {
303 Some(id) => id,
304 None => {
305 return Err(BlockchainRpcClientError::InternalRpcClientError(
306 "Error parsing subscription id from valid rpc response"
307 .to_string(),
308 ));
309 }
310 };
311 if let Some(event_type) =
312 self.subscription_event_types.get(subscription_id)
313 {
314 match event_type {
315 RpcEventType::NewBlock => {
316 return match serde_json::from_value::<
317 RpcNodeWssResponse<Block>,
318 >(
319 json
320 ) {
321 Ok(block_response) => {
322 let block = block_response.params.result;
323 Ok(BlockchainMessage::Block(block))
324 }
325 Err(e) => Err(
326 BlockchainRpcClientError::MessageParsingError(
327 format!(
328 "Error parsing rpc response to block with error {e}"
329 ),
330 ),
331 ),
332 };
333 }
334 }
335 }
336 return Err(BlockchainRpcClientError::InternalRpcClientError(
337 format!(
338 "Event type not found for defined subscription id {subscription_id}"
339 ),
340 ));
341 }
342 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
343 json.to_string(),
344 ));
345 }
346 Err(e) => {
347 return Err(BlockchainRpcClientError::MessageParsingError(
348 e.to_string(),
349 ));
350 }
351 }
352 }
353 Message::Pong(_) => {
354 continue;
355 }
356 _ => {
357 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
358 msg.to_string(),
359 ));
360 }
361 }
362 }
363
364 Err(BlockchainRpcClientError::NoMessageReceived)
365 }
366
367 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
373 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
374 .await
375 }
376
377 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
383 self.unsubscribe_events(String::from("newHeads")).await?;
384
385 let subscription_ids_to_remove: Vec<String> = self
386 .subscription_event_types
387 .iter()
388 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
389 .map(|(id, _)| id.clone())
390 .collect();
391
392 for id in subscription_ids_to_remove {
393 self.subscription_event_types.remove(&id);
394 }
395
396 let mut confirmed = self.subscriptions.write().await;
397 confirmed.remove(&RpcEventType::NewBlock);
398
399 Ok(())
400 }
401}