nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21    RECONNECTED,
22    websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
23};
24use reqwest::header::USER_AGENT;
25use tokio::sync::RwLock;
26use tokio_tungstenite::tungstenite::Message;
27
28use crate::rpc::{
29    error::BlockchainRpcClientError,
30    types::{BlockchainMessage, RpcEventType},
31    utils::{
32        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
33    },
34};
35
36/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
37///
38/// It provides a shared implementation of common blockchain RPC functionality, handling:
39/// - WebSocket connection management with blockchain RPC node.
40/// - Subscription lifecycle (creation, tracking, and termination).
41/// - Message serialization and deserialization of RPC messages.
42/// - Event type mapping and dispatching.
43/// - Automatic subscription re-establishment on reconnection.
44pub struct CoreBlockchainRpcClient {
45    /// The blockchain network type this client connects to.
46    chain: Chain,
47    /// WebSocket secure URL for the blockchain node's RPC endpoint.
48    wss_rpc_url: String,
49    /// Auto-incrementing counter for generating unique RPC request IDs.
50    request_id: u64,
51    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
52    pending_subscription_request: HashMap<u64, RpcEventType>,
53    /// Maps active subscription IDs to their corresponding event types for message
54    /// deserialization.
55    subscription_event_types: HashMap<String, RpcEventType>,
56    /// The active WebSocket client connection.
57    wss_client: Option<Arc<WebSocketClient>>,
58    /// Channel receiver for consuming WebSocket messages.
59    wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
60    /// Tracks confirmed subscriptions that need to be re-established on reconnection.
61    subscriptions: Arc<RwLock<HashMap<RpcEventType, String>>>,
62}
63
64impl Debug for CoreBlockchainRpcClient {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("CoreBlockchainRpcClient")
67            .field("chain", &self.chain)
68            .field("wss_rpc_url", &self.wss_rpc_url)
69            .field("request_id", &self.request_id)
70            .field(
71                "pending_subscription_request",
72                &self.pending_subscription_request,
73            )
74            .field("subscription_event_types", &self.subscription_event_types)
75            .field(
76                "wss_client",
77                &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
78            )
79            .field(
80                "wss_consumer_rx",
81                &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
82            )
83            .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
84            .finish()
85    }
86}
87
88impl CoreBlockchainRpcClient {
89    #[must_use]
90    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
91        Self {
92            chain,
93            wss_rpc_url,
94            request_id: 1,
95            wss_client: None,
96            pending_subscription_request: HashMap::new(),
97            subscription_event_types: HashMap::new(),
98            wss_consumer_rx: None,
99            subscriptions: Arc::new(RwLock::new(HashMap::new())),
100        }
101    }
102
103    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
104    ///
105    /// Configures automatic reconnection with exponential backoff and subscription re-establishment.
106    /// Reconnection is handled via the `RECONNECTED` message in the message stream.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the WebSocket connection fails.
111    pub async fn connect(&mut self) -> anyhow::Result<()> {
112        let (handler, rx) = channel_message_handler();
113        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
114
115        // Most blockchain RPC nodes require a heartbeat to keep the connection alive
116        let heartbeat_interval = 30;
117
118        let config = WebSocketConfig {
119            url: self.wss_rpc_url.clone(),
120            headers: vec![user_agent],
121            message_handler: Some(handler),
122            heartbeat: Some(heartbeat_interval),
123            heartbeat_msg: None,
124            ping_handler: None,
125            reconnect_timeout_ms: Some(10_000),
126            reconnect_delay_initial_ms: Some(1_000),
127            reconnect_delay_max_ms: Some(30_000),
128            reconnect_backoff_factor: Some(2.0),
129            reconnect_jitter_ms: Some(1_000),
130        };
131
132        let client = WebSocketClient::connect(config, None, vec![], None).await?;
133
134        self.wss_client = Some(Arc::new(client));
135        self.wss_consumer_rx = Some(rx);
136
137        Ok(())
138    }
139
140    /// Registers a subscription for the specified event type and records it internally with the given ID.
141    async fn subscribe_events(
142        &mut self,
143        event_type: RpcEventType,
144        subscription_id: String,
145    ) -> Result<(), BlockchainRpcClientError> {
146        if let Some(client) = &self.wss_client {
147            log::info!(
148                "Subscribing to '{}' on chain '{}'",
149                subscription_id,
150                self.chain.name
151            );
152            let msg = serde_json::json!({
153                "method": "eth_subscribe",
154                "id": self.request_id,
155                "jsonrpc": "2.0",
156                "params": [subscription_id]
157            });
158            self.pending_subscription_request
159                .insert(self.request_id, event_type.clone());
160            self.request_id += 1;
161            if let Err(e) = client.send_text(msg.to_string(), None).await {
162                log::error!("Error sending subscribe message: {e:?}");
163            }
164
165            // Track subscription for re-establishment on reconnect
166            let mut confirmed = self.subscriptions.write().await;
167            confirmed.insert(event_type, subscription_id);
168
169            Ok(())
170        } else {
171            Err(BlockchainRpcClientError::ClientError(String::from(
172                "Client not connected",
173            )))
174        }
175    }
176
177    /// Re-establishes all confirmed subscriptions after reconnection.
178    async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
179        let subscriptions = self.subscriptions.read().await;
180
181        if subscriptions.is_empty() {
182            log::debug!(
183                "No subscriptions to re-establish for chain '{}'",
184                self.chain.name
185            );
186            return Ok(());
187        }
188
189        log::info!(
190            "Re-establishing {} subscription(s) for chain '{}'",
191            subscriptions.len(),
192            self.chain.name
193        );
194
195        let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
196            .iter()
197            .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
198            .collect();
199
200        drop(subscriptions);
201
202        for (event_type, subscription_id) in subs_to_restore {
203            if let Some(client) = &self.wss_client {
204                log::debug!(
205                    "Re-subscribing to '{}' on chain '{}'",
206                    subscription_id,
207                    self.chain.name
208                );
209
210                let msg = serde_json::json!({
211                    "method": "eth_subscribe",
212                    "id": self.request_id,
213                    "jsonrpc": "2.0",
214                    "params": [subscription_id]
215                });
216
217                self.pending_subscription_request
218                    .insert(self.request_id, event_type);
219                self.request_id += 1;
220
221                if let Err(e) = client.send_text(msg.to_string(), None).await {
222                    log::error!("Error re-subscribing after reconnection: {e:?}");
223                }
224            }
225        }
226
227        Ok(())
228    }
229
230    /// Terminates a subscription with the blockchain node using the provided subscription ID.
231    async fn unsubscribe_events(
232        &self,
233        subscription_id: String,
234    ) -> Result<(), BlockchainRpcClientError> {
235        if let Some(client) = &self.wss_client {
236            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
237            let msg = serde_json::json!({
238                "method": "eth_unsubscribe",
239                "id": 1,
240                "jsonrpc": "2.0",
241                "params": [subscription_id]
242            });
243            if let Err(e) = client.send_text(msg.to_string(), None).await {
244                log::error!("Error sending unsubscribe message: {e:?}");
245            }
246            Ok(())
247        } else {
248            Err(BlockchainRpcClientError::ClientError(String::from(
249                "Client not connected",
250            )))
251        }
252    }
253
254    /// Waits for and returns the next available message from the WebSocket channel.
255    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
256        match &mut self.wss_consumer_rx {
257            Some(rx) => rx.recv().await,
258            None => None,
259        }
260    }
261
262    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
263    ///
264    /// Handles subscription confirmations, events, and reconnection signals automatically.
265    ///
266    /// # Panics
267    ///
268    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
273    pub async fn next_rpc_message(
274        &mut self,
275    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
276        while let Some(msg) = self.wait_on_rpc_channel().await {
277            match msg {
278                Message::Text(text) => {
279                    if text == RECONNECTED {
280                        log::info!("Detected reconnection for chain '{}'", self.chain.name);
281                        if let Err(e) = self.resubscribe_all().await {
282                            log::error!("Failed to re-establish subscriptions: {e:?}");
283                        }
284                        continue;
285                    }
286
287                    match serde_json::from_str::<serde_json::Value>(&text) {
288                        Ok(json) => {
289                            if is_subscription_confirmation_response(&json) {
290                                let subscription_request_id =
291                                    json.get("id").unwrap().as_u64().unwrap();
292                                let result = json.get("result").unwrap().as_str().unwrap();
293                                let event_type = self
294                                    .pending_subscription_request
295                                    .get(&subscription_request_id)
296                                    .unwrap();
297                                self.subscription_event_types
298                                    .insert(result.to_string(), event_type.clone());
299                                self.pending_subscription_request
300                                    .remove(&subscription_request_id);
301                                continue;
302                            } else if is_subscription_event(&json) {
303                                let subscription_id = match extract_rpc_subscription_id(&json) {
304                                    Some(id) => id,
305                                    None => {
306                                        return Err(BlockchainRpcClientError::InternalRpcClientError(
307                                        "Error parsing subscription id from valid rpc response"
308                                            .to_string(),
309                                    ));
310                                    }
311                                };
312                                if let Some(event_type) =
313                                    self.subscription_event_types.get(subscription_id)
314                                {
315                                    match event_type {
316                                        RpcEventType::NewBlock => {
317                                            return match serde_json::from_value::<
318                                                RpcNodeWssResponse<Block>,
319                                            >(
320                                                json
321                                            ) {
322                                                Ok(block_response) => {
323                                                    let block = block_response.params.result;
324                                                    Ok(BlockchainMessage::Block(block))
325                                                }
326                                                Err(e) => Err(
327                                                    BlockchainRpcClientError::MessageParsingError(
328                                                        format!(
329                                                            "Error parsing rpc response to block with error {e}"
330                                                        ),
331                                                    ),
332                                                ),
333                                            };
334                                        }
335                                    }
336                                }
337                                return Err(BlockchainRpcClientError::InternalRpcClientError(
338                                    format!(
339                                        "Event type not found for defined subscription id {subscription_id}"
340                                    ),
341                                ));
342                            }
343                            return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
344                                json.to_string(),
345                            ));
346                        }
347                        Err(e) => {
348                            return Err(BlockchainRpcClientError::MessageParsingError(
349                                e.to_string(),
350                            ));
351                        }
352                    }
353                }
354                Message::Pong(_) => {
355                    continue;
356                }
357                _ => {
358                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
359                        msg.to_string(),
360                    ));
361                }
362            }
363        }
364
365        Err(BlockchainRpcClientError::NoMessageReceived)
366    }
367
368    /// Subscribes to real-time block updates from the blockchain node.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the subscription request fails or if the client is not connected.
373    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
374        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
375            .await
376    }
377
378    /// Cancels the subscription to real-time block updates.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the unsubscription request fails or if the client is not connected.
383    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
384        self.unsubscribe_events(String::from("newHeads")).await?;
385
386        let subscription_ids_to_remove: Vec<String> = self
387            .subscription_event_types
388            .iter()
389            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
390            .map(|(id, _)| id.clone())
391            .collect();
392
393        for id in subscription_ids_to_remove {
394            self.subscription_event_types.remove(&id);
395        }
396
397        let mut confirmed = self.subscriptions.write().await;
398        confirmed.remove(&RpcEventType::NewBlock);
399
400        Ok(())
401    }
402}