nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21    RECONNECTED,
22    websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
23};
24use reqwest::header::USER_AGENT;
25use tokio::sync::RwLock;
26use tokio_tungstenite::tungstenite::Message;
27
28use crate::rpc::{
29    error::BlockchainRpcClientError,
30    types::{BlockchainMessage, RpcEventType},
31    utils::{
32        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
33    },
34};
35
36/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
37///
38/// It provides a shared implementation of common blockchain RPC functionality, handling:
39/// - WebSocket connection management with blockchain RPC node.
40/// - Subscription lifecycle (creation, tracking, and termination).
41/// - Message serialization and deserialization of RPC messages.
42/// - Event type mapping and dispatching.
43/// - Automatic subscription re-establishment on reconnection.
44pub struct CoreBlockchainRpcClient {
45    /// The blockchain network type this client connects to.
46    chain: Chain,
47    /// WebSocket secure URL for the blockchain node's RPC endpoint.
48    wss_rpc_url: String,
49    /// Auto-incrementing counter for generating unique RPC request IDs.
50    request_id: u64,
51    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
52    pending_subscription_request: HashMap<u64, RpcEventType>,
53    /// Maps active subscription IDs to their corresponding event types for message
54    /// deserialization.
55    subscription_event_types: HashMap<String, RpcEventType>,
56    /// The active WebSocket client connection.
57    wss_client: Option<Arc<WebSocketClient>>,
58    /// Channel receiver for consuming WebSocket messages.
59    wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
60    /// Tracks confirmed subscriptions that need to be re-established on reconnection.
61    subscriptions: Arc<RwLock<HashMap<RpcEventType, String>>>,
62}
63
64impl Debug for CoreBlockchainRpcClient {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("CoreBlockchainRpcClient")
67            .field("chain", &self.chain)
68            .field("wss_rpc_url", &self.wss_rpc_url)
69            .field("request_id", &self.request_id)
70            .field(
71                "pending_subscription_request",
72                &self.pending_subscription_request,
73            )
74            .field("subscription_event_types", &self.subscription_event_types)
75            .field(
76                "wss_client",
77                &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
78            )
79            .field(
80                "wss_consumer_rx",
81                &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
82            )
83            .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
84            .finish()
85    }
86}
87
88impl CoreBlockchainRpcClient {
89    #[must_use]
90    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
91        Self {
92            chain,
93            wss_rpc_url,
94            request_id: 1,
95            wss_client: None,
96            pending_subscription_request: HashMap::new(),
97            subscription_event_types: HashMap::new(),
98            wss_consumer_rx: None,
99            subscriptions: Arc::new(RwLock::new(HashMap::new())),
100        }
101    }
102
103    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
104    ///
105    /// Configures automatic reconnection with exponential backoff and subscription re-establishment.
106    /// Reconnection is handled via the `RECONNECTED` message in the message stream.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the WebSocket connection fails.
111    pub async fn connect(&mut self) -> anyhow::Result<()> {
112        let (handler, rx) = channel_message_handler();
113        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
114
115        // Most blockchain RPC nodes require a heartbeat to keep the connection alive
116        let heartbeat_interval = 30;
117
118        let config = WebSocketConfig {
119            url: self.wss_rpc_url.clone(),
120            headers: vec![user_agent],
121            message_handler: Some(handler),
122            heartbeat: Some(heartbeat_interval),
123            heartbeat_msg: None,
124            ping_handler: None,
125            reconnect_timeout_ms: Some(10_000),
126            reconnect_delay_initial_ms: Some(1_000),
127            reconnect_delay_max_ms: Some(30_000),
128            reconnect_backoff_factor: Some(2.0),
129            reconnect_jitter_ms: Some(1_000),
130            reconnect_max_attempts: None,
131        };
132
133        let client = WebSocketClient::connect(config, None, vec![], None).await?;
134
135        self.wss_client = Some(Arc::new(client));
136        self.wss_consumer_rx = Some(rx);
137
138        Ok(())
139    }
140
141    /// Registers a subscription for the specified event type and records it internally with the given ID.
142    async fn subscribe_events(
143        &mut self,
144        event_type: RpcEventType,
145        subscription_id: String,
146    ) -> Result<(), BlockchainRpcClientError> {
147        if let Some(client) = &self.wss_client {
148            log::info!(
149                "Subscribing to '{}' on chain '{}'",
150                subscription_id,
151                self.chain.name
152            );
153            let msg = serde_json::json!({
154                "method": "eth_subscribe",
155                "id": self.request_id,
156                "jsonrpc": "2.0",
157                "params": [subscription_id]
158            });
159            self.pending_subscription_request
160                .insert(self.request_id, event_type.clone());
161            self.request_id += 1;
162            if let Err(e) = client.send_text(msg.to_string(), None).await {
163                log::error!("Error sending subscribe message: {e:?}");
164            }
165
166            // Track subscription for re-establishment on reconnect
167            let mut confirmed = self.subscriptions.write().await;
168            confirmed.insert(event_type, subscription_id);
169
170            Ok(())
171        } else {
172            Err(BlockchainRpcClientError::ClientError(String::from(
173                "Client not connected",
174            )))
175        }
176    }
177
178    /// Re-establishes all confirmed subscriptions after reconnection.
179    async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
180        let subscriptions = self.subscriptions.read().await;
181
182        if subscriptions.is_empty() {
183            log::debug!(
184                "No subscriptions to re-establish for chain '{}'",
185                self.chain.name
186            );
187            return Ok(());
188        }
189
190        log::info!(
191            "Re-establishing {} subscription(s) for chain '{}'",
192            subscriptions.len(),
193            self.chain.name
194        );
195
196        let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
197            .iter()
198            .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
199            .collect();
200
201        drop(subscriptions);
202
203        for (event_type, subscription_id) in subs_to_restore {
204            if let Some(client) = &self.wss_client {
205                log::debug!(
206                    "Re-subscribing to '{}' on chain '{}'",
207                    subscription_id,
208                    self.chain.name
209                );
210
211                let msg = serde_json::json!({
212                    "method": "eth_subscribe",
213                    "id": self.request_id,
214                    "jsonrpc": "2.0",
215                    "params": [subscription_id]
216                });
217
218                self.pending_subscription_request
219                    .insert(self.request_id, event_type);
220                self.request_id += 1;
221
222                if let Err(e) = client.send_text(msg.to_string(), None).await {
223                    log::error!("Error re-subscribing after reconnection: {e:?}");
224                }
225            }
226        }
227
228        Ok(())
229    }
230
231    /// Terminates a subscription with the blockchain node using the provided subscription ID.
232    async fn unsubscribe_events(
233        &self,
234        subscription_id: String,
235    ) -> Result<(), BlockchainRpcClientError> {
236        if let Some(client) = &self.wss_client {
237            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
238            let msg = serde_json::json!({
239                "method": "eth_unsubscribe",
240                "id": 1,
241                "jsonrpc": "2.0",
242                "params": [subscription_id]
243            });
244            if let Err(e) = client.send_text(msg.to_string(), None).await {
245                log::error!("Error sending unsubscribe message: {e:?}");
246            }
247            Ok(())
248        } else {
249            Err(BlockchainRpcClientError::ClientError(String::from(
250                "Client not connected",
251            )))
252        }
253    }
254
255    /// Waits for and returns the next available message from the WebSocket channel.
256    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
257        match &mut self.wss_consumer_rx {
258            Some(rx) => rx.recv().await,
259            None => None,
260        }
261    }
262
263    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
264    ///
265    /// Handles subscription confirmations, events, and reconnection signals automatically.
266    ///
267    /// # Panics
268    ///
269    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
274    pub async fn next_rpc_message(
275        &mut self,
276    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
277        while let Some(msg) = self.wait_on_rpc_channel().await {
278            match msg {
279                Message::Text(text) => {
280                    if text == RECONNECTED {
281                        log::info!("Detected reconnection for chain '{}'", self.chain.name);
282                        if let Err(e) = self.resubscribe_all().await {
283                            log::error!("Failed to re-establish subscriptions: {e:?}");
284                        }
285                        continue;
286                    }
287
288                    match serde_json::from_str::<serde_json::Value>(&text) {
289                        Ok(json) => {
290                            if is_subscription_confirmation_response(&json) {
291                                let subscription_request_id =
292                                    json.get("id").unwrap().as_u64().unwrap();
293                                let result = json.get("result").unwrap().as_str().unwrap();
294                                let event_type = self
295                                    .pending_subscription_request
296                                    .get(&subscription_request_id)
297                                    .unwrap();
298                                self.subscription_event_types
299                                    .insert(result.to_string(), event_type.clone());
300                                self.pending_subscription_request
301                                    .remove(&subscription_request_id);
302                                continue;
303                            } else if is_subscription_event(&json) {
304                                let subscription_id = match extract_rpc_subscription_id(&json) {
305                                    Some(id) => id,
306                                    None => {
307                                        return Err(BlockchainRpcClientError::InternalRpcClientError(
308                                        "Error parsing subscription id from valid rpc response"
309                                            .to_string(),
310                                    ));
311                                    }
312                                };
313                                if let Some(event_type) =
314                                    self.subscription_event_types.get(subscription_id)
315                                {
316                                    match event_type {
317                                        RpcEventType::NewBlock => {
318                                            return match serde_json::from_value::<
319                                                RpcNodeWssResponse<Block>,
320                                            >(
321                                                json
322                                            ) {
323                                                Ok(block_response) => {
324                                                    let block = block_response.params.result;
325                                                    Ok(BlockchainMessage::Block(block))
326                                                }
327                                                Err(e) => Err(
328                                                    BlockchainRpcClientError::MessageParsingError(
329                                                        format!(
330                                                            "Error parsing rpc response to block with error {e}"
331                                                        ),
332                                                    ),
333                                                ),
334                                            };
335                                        }
336                                    }
337                                }
338                                return Err(BlockchainRpcClientError::InternalRpcClientError(
339                                    format!(
340                                        "Event type not found for defined subscription id {subscription_id}"
341                                    ),
342                                ));
343                            }
344                            return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
345                                json.to_string(),
346                            ));
347                        }
348                        Err(e) => {
349                            return Err(BlockchainRpcClientError::MessageParsingError(
350                                e.to_string(),
351                            ));
352                        }
353                    }
354                }
355                Message::Pong(_) => {
356                    continue;
357                }
358                _ => {
359                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
360                        msg.to_string(),
361                    ));
362                }
363            }
364        }
365
366        Err(BlockchainRpcClientError::NoMessageReceived)
367    }
368
369    /// Subscribes to real-time block updates from the blockchain node.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the subscription request fails or if the client is not connected.
374    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
375        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
376            .await
377    }
378
379    /// Cancels the subscription to real-time block updates.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the unsubscription request fails or if the client is not connected.
384    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
385        self.unsubscribe_events(String::from("newHeads")).await?;
386
387        let subscription_ids_to_remove: Vec<String> = self
388            .subscription_event_types
389            .iter()
390            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
391            .map(|(id, _)| id.clone())
392            .collect();
393
394        for id in subscription_ids_to_remove {
395            self.subscription_event_types.remove(&id);
396        }
397
398        let mut confirmed = self.subscriptions.write().await;
399        confirmed.remove(&RpcEventType::NewBlock);
400
401        Ok(())
402    }
403}