nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21    RECONNECTED,
22    http::USER_AGENT,
23    websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
24};
25use tokio_tungstenite::tungstenite::Message;
26
27use crate::rpc::{
28    error::BlockchainRpcClientError,
29    types::{BlockchainMessage, RpcEventType},
30    utils::{
31        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
32    },
33};
34
35/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
36///
37/// It provides a shared implementation of common blockchain RPC functionality, handling:
38/// - WebSocket connection management with blockchain RPC node.
39/// - Subscription lifecycle (creation, tracking, and termination).
40/// - Message serialization and deserialization of RPC messages.
41/// - Event type mapping and dispatching.
42/// - Automatic subscription re-establishment on reconnection.
43pub struct CoreBlockchainRpcClient {
44    /// The blockchain network type this client connects to.
45    chain: Chain,
46    /// WebSocket secure URL for the blockchain node's RPC endpoint.
47    wss_rpc_url: String,
48    /// Auto-incrementing counter for generating unique RPC request IDs.
49    request_id: u64,
50    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
51    pending_subscription_request: HashMap<u64, RpcEventType>,
52    /// Maps active subscription IDs to their corresponding event types for message
53    /// deserialization.
54    subscription_event_types: HashMap<String, RpcEventType>,
55    /// The active WebSocket client connection.
56    wss_client: Option<Arc<WebSocketClient>>,
57    /// Channel receiver for consuming WebSocket messages.
58    wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
59    /// Tracks confirmed subscriptions that need to be re-established on reconnection.
60    subscriptions: Arc<tokio::sync::RwLock<HashMap<RpcEventType, String>>>,
61}
62
63impl Debug for CoreBlockchainRpcClient {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("CoreBlockchainRpcClient")
66            .field("chain", &self.chain)
67            .field("wss_rpc_url", &self.wss_rpc_url)
68            .field("request_id", &self.request_id)
69            .field(
70                "pending_subscription_request",
71                &self.pending_subscription_request,
72            )
73            .field("subscription_event_types", &self.subscription_event_types)
74            .field(
75                "wss_client",
76                &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
77            )
78            .field(
79                "wss_consumer_rx",
80                &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
81            )
82            .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
83            .finish()
84    }
85}
86
87impl CoreBlockchainRpcClient {
88    #[must_use]
89    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
90        Self {
91            chain,
92            wss_rpc_url,
93            request_id: 1,
94            wss_client: None,
95            pending_subscription_request: HashMap::new(),
96            subscription_event_types: HashMap::new(),
97            wss_consumer_rx: None,
98            subscriptions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
99        }
100    }
101
102    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
103    ///
104    /// Configures automatic reconnection with exponential backoff and subscription re-establishment.
105    /// Reconnection is handled via the `RECONNECTED` message in the message stream.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the WebSocket connection fails.
110    pub async fn connect(&mut self) -> anyhow::Result<()> {
111        let (handler, rx) = channel_message_handler();
112        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
113
114        // Most blockchain RPC nodes require a heartbeat to keep the connection alive
115        let heartbeat_interval = 30;
116
117        let config = WebSocketConfig {
118            url: self.wss_rpc_url.clone(),
119            headers: vec![user_agent],
120            heartbeat: Some(heartbeat_interval),
121            heartbeat_msg: None,
122            reconnect_timeout_ms: Some(10_000),
123            reconnect_delay_initial_ms: Some(1_000),
124            reconnect_delay_max_ms: Some(30_000),
125            reconnect_backoff_factor: Some(2.0),
126            reconnect_jitter_ms: Some(1_000),
127            reconnect_max_attempts: None,
128        };
129
130        let client =
131            WebSocketClient::connect(config, Some(handler), None, None, vec![], None).await?;
132
133        self.wss_client = Some(Arc::new(client));
134        self.wss_consumer_rx = Some(rx);
135
136        Ok(())
137    }
138
139    /// Registers a subscription for the specified event type and records it internally with the given ID.
140    async fn subscribe_events(
141        &mut self,
142        event_type: RpcEventType,
143        subscription_id: String,
144    ) -> Result<(), BlockchainRpcClientError> {
145        if let Some(client) = &self.wss_client {
146            log::info!(
147                "Subscribing to '{}' on chain '{}'",
148                subscription_id,
149                self.chain.name
150            );
151            let msg = serde_json::json!({
152                "method": "eth_subscribe",
153                "id": self.request_id,
154                "jsonrpc": "2.0",
155                "params": [subscription_id]
156            });
157            self.pending_subscription_request
158                .insert(self.request_id, event_type.clone());
159            self.request_id += 1;
160            if let Err(e) = client.send_text(msg.to_string(), None).await {
161                log::error!("Error sending subscribe message: {e:?}");
162            }
163
164            // Track subscription for re-establishment on reconnect
165            let mut confirmed = self.subscriptions.write().await;
166            confirmed.insert(event_type, subscription_id);
167
168            Ok(())
169        } else {
170            Err(BlockchainRpcClientError::ClientError(String::from(
171                "Client not connected",
172            )))
173        }
174    }
175
176    /// Re-establishes all confirmed subscriptions after reconnection.
177    async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
178        let subscriptions = self.subscriptions.read().await;
179
180        if subscriptions.is_empty() {
181            log::debug!(
182                "No subscriptions to re-establish for chain '{}'",
183                self.chain.name
184            );
185            return Ok(());
186        }
187
188        log::info!(
189            "Re-establishing {} subscription(s) for chain '{}'",
190            subscriptions.len(),
191            self.chain.name
192        );
193
194        let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
195            .iter()
196            .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
197            .collect();
198
199        drop(subscriptions);
200
201        for (event_type, subscription_id) in subs_to_restore {
202            if let Some(client) = &self.wss_client {
203                log::debug!(
204                    "Re-subscribing to '{}' on chain '{}'",
205                    subscription_id,
206                    self.chain.name
207                );
208
209                let msg = serde_json::json!({
210                    "method": "eth_subscribe",
211                    "id": self.request_id,
212                    "jsonrpc": "2.0",
213                    "params": [subscription_id]
214                });
215
216                self.pending_subscription_request
217                    .insert(self.request_id, event_type);
218                self.request_id += 1;
219
220                if let Err(e) = client.send_text(msg.to_string(), None).await {
221                    log::error!("Error re-subscribing after reconnection: {e:?}");
222                }
223            }
224        }
225
226        Ok(())
227    }
228
229    /// Terminates a subscription with the blockchain node using the provided subscription ID.
230    async fn unsubscribe_events(
231        &self,
232        subscription_id: String,
233    ) -> Result<(), BlockchainRpcClientError> {
234        if let Some(client) = &self.wss_client {
235            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
236            let msg = serde_json::json!({
237                "method": "eth_unsubscribe",
238                "id": 1,
239                "jsonrpc": "2.0",
240                "params": [subscription_id]
241            });
242            if let Err(e) = client.send_text(msg.to_string(), None).await {
243                log::error!("Error sending unsubscribe message: {e:?}");
244            }
245            Ok(())
246        } else {
247            Err(BlockchainRpcClientError::ClientError(String::from(
248                "Client not connected",
249            )))
250        }
251    }
252
253    /// Waits for and returns the next available message from the WebSocket channel.
254    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
255        match &mut self.wss_consumer_rx {
256            Some(rx) => rx.recv().await,
257            None => None,
258        }
259    }
260
261    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
262    ///
263    /// Handles subscription confirmations, events, and reconnection signals automatically.
264    ///
265    /// # Panics
266    ///
267    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
272    pub async fn next_rpc_message(
273        &mut self,
274    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
275        while let Some(msg) = self.wait_on_rpc_channel().await {
276            match msg {
277                Message::Text(text) => {
278                    if text == RECONNECTED {
279                        log::info!("Detected reconnection for chain '{}'", self.chain.name);
280                        if let Err(e) = self.resubscribe_all().await {
281                            log::error!("Failed to re-establish subscriptions: {e:?}");
282                        }
283                        continue;
284                    }
285
286                    match serde_json::from_str::<serde_json::Value>(&text) {
287                        Ok(json) => {
288                            if is_subscription_confirmation_response(&json) {
289                                let subscription_request_id =
290                                    json.get("id").unwrap().as_u64().unwrap();
291                                let result = json.get("result").unwrap().as_str().unwrap();
292                                let event_type = self
293                                    .pending_subscription_request
294                                    .get(&subscription_request_id)
295                                    .unwrap();
296                                self.subscription_event_types
297                                    .insert(result.to_string(), event_type.clone());
298                                self.pending_subscription_request
299                                    .remove(&subscription_request_id);
300                                continue;
301                            } else if is_subscription_event(&json) {
302                                let subscription_id = match extract_rpc_subscription_id(&json) {
303                                    Some(id) => id,
304                                    None => {
305                                        return Err(BlockchainRpcClientError::InternalRpcClientError(
306                                        "Error parsing subscription id from valid rpc response"
307                                            .to_string(),
308                                    ));
309                                    }
310                                };
311                                if let Some(event_type) =
312                                    self.subscription_event_types.get(subscription_id)
313                                {
314                                    match event_type {
315                                        RpcEventType::NewBlock => {
316                                            return match serde_json::from_value::<
317                                                RpcNodeWssResponse<Block>,
318                                            >(
319                                                json
320                                            ) {
321                                                Ok(block_response) => {
322                                                    let block = block_response.params.result;
323                                                    Ok(BlockchainMessage::Block(block))
324                                                }
325                                                Err(e) => Err(
326                                                    BlockchainRpcClientError::MessageParsingError(
327                                                        format!(
328                                                            "Error parsing rpc response to block with error {e}"
329                                                        ),
330                                                    ),
331                                                ),
332                                            };
333                                        }
334                                    }
335                                }
336                                return Err(BlockchainRpcClientError::InternalRpcClientError(
337                                    format!(
338                                        "Event type not found for defined subscription id {subscription_id}"
339                                    ),
340                                ));
341                            }
342                            return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
343                                json.to_string(),
344                            ));
345                        }
346                        Err(e) => {
347                            return Err(BlockchainRpcClientError::MessageParsingError(
348                                e.to_string(),
349                            ));
350                        }
351                    }
352                }
353                Message::Pong(_) => {
354                    continue;
355                }
356                _ => {
357                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
358                        msg.to_string(),
359                    ));
360                }
361            }
362        }
363
364        Err(BlockchainRpcClientError::NoMessageReceived)
365    }
366
367    /// Subscribes to real-time block updates from the blockchain node.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the subscription request fails or if the client is not connected.
372    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
373        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
374            .await
375    }
376
377    /// Cancels the subscription to real-time block updates.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the unsubscription request fails or if the client is not connected.
382    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
383        self.unsubscribe_events(String::from("newHeads")).await?;
384
385        let subscription_ids_to_remove: Vec<String> = self
386            .subscription_event_types
387            .iter()
388            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
389            .map(|(id, _)| id.clone())
390            .collect();
391
392        for id in subscription_ids_to_remove {
393            self.subscription_event_types.remove(&id);
394        }
395
396        let mut confirmed = self.subscriptions.write().await;
397        confirmed.remove(&RpcEventType::NewBlock);
398
399        Ok(())
400    }
401}