nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25    error::BlockchainRpcClientError,
26    types::{BlockchainMessage, RpcEventType},
27    utils::{
28        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29    },
30};
31
32/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
33///
34/// It provides a shared implementation of common blockchain RPC functionality, handling:
35/// - WebSocket connection management with blockchain RPC node.
36/// - Subscription lifecycle (creation, tracking, and termination).
37/// - Message serialization and deserialization of RPC messages.
38/// - Event type mapping and dispatching.
39#[derive(Debug)]
40pub struct CoreBlockchainRpcClient {
41    /// The blockchain network type this client connects to.
42    chain: Chain,
43    /// WebSocket secure URL for the blockchain node's RPC endpoint.
44    wss_rpc_url: String,
45    /// Auto-incrementing counter for generating unique RPC request IDs.
46    request_id: u64,
47    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
48    pending_subscription_request: HashMap<u64, RpcEventType>,
49    /// Maps active subscription IDs to their corresponding event types for message
50    /// deserialization.
51    subscription_event_types: HashMap<String, RpcEventType>,
52    /// The active WebSocket client connection.
53    wss_client: Option<Arc<WebSocketClient>>,
54    /// Channel receiver for consuming WebSocket messages.
55    wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
56}
57
58impl CoreBlockchainRpcClient {
59    #[must_use]
60    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
61        Self {
62            chain,
63            wss_rpc_url,
64            request_id: 1,
65            wss_client: None,
66            pending_subscription_request: HashMap::new(),
67            subscription_event_types: HashMap::new(),
68            wss_consumer_rx: None,
69        }
70    }
71
72    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the WebSocket connection fails.
77    pub async fn connect(&mut self) -> anyhow::Result<()> {
78        let (handler, rx) = channel_message_handler();
79        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
80        // Most of the blockchain rpc nodes require a heartbeat to keep the connection alive
81        let heartbeat_interval = 30;
82        let config = WebSocketConfig {
83            url: self.wss_rpc_url.clone(),
84            headers: vec![user_agent],
85            message_handler: Some(handler),
86            heartbeat: Some(heartbeat_interval),
87            heartbeat_msg: None,
88            ping_handler: None,
89            reconnect_timeout_ms: Some(5_000),
90            reconnect_delay_initial_ms: None,
91            reconnect_delay_max_ms: None,
92            reconnect_backoff_factor: None,
93            reconnect_jitter_ms: None,
94        };
95        let client = WebSocketClient::connect(
96            config,
97            None,   // post_reconnection
98            vec![], // keyed_quotas
99            None,   // default_quota
100        )
101        .await?;
102
103        self.wss_client = Some(Arc::new(client));
104        self.wss_consumer_rx = Some(rx);
105
106        Ok(())
107    }
108
109    /// Registers a subscription for the specified event type and records it internally with the given ID.
110    async fn subscribe_events(
111        &mut self,
112        event_type: RpcEventType,
113        subscription_id: String,
114    ) -> Result<(), BlockchainRpcClientError> {
115        if let Some(client) = &self.wss_client {
116            log::info!("Subscribing to new blocks on chain '{}'", self.chain.name);
117            let msg = serde_json::json!({
118                "method": "eth_subscribe",
119                "id": self.request_id,
120                "jsonrpc": "2.0",
121                "params": [subscription_id]
122            });
123            self.pending_subscription_request
124                .insert(self.request_id, event_type);
125            self.request_id += 1;
126            if let Err(err) = client.send_text(msg.to_string(), None).await {
127                log::error!("Error sending subscribe message: {err:?}");
128            }
129            Ok(())
130        } else {
131            Err(BlockchainRpcClientError::ClientError(String::from(
132                "Client not connected",
133            )))
134        }
135    }
136
137    /// Terminates a subscription with the blockchain node using the provided subscription ID.
138    async fn unsubscribe_events(
139        &self,
140        subscription_id: String,
141    ) -> Result<(), BlockchainRpcClientError> {
142        if let Some(client) = &self.wss_client {
143            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
144            let msg = serde_json::json!({
145                "method": "eth_unsubscribe",
146                "id": 1,
147                "jsonrpc": "2.0",
148                "params": [subscription_id]
149            });
150            if let Err(err) = client.send_text(msg.to_string(), None).await {
151                log::error!("Error sending unsubscribe message: {err:?}");
152            }
153            Ok(())
154        } else {
155            Err(BlockchainRpcClientError::ClientError(String::from(
156                "Client not connected",
157            )))
158        }
159    }
160
161    /// Waits for and returns the next available message from the WebSocket channel.
162    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
163        match &mut self.wss_consumer_rx {
164            Some(rx) => rx.recv().await,
165            None => None,
166        }
167    }
168
169    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
170    ///
171    /// # Panics
172    ///
173    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
178    pub async fn next_rpc_message(
179        &mut self,
180    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
181        while let Some(msg) = self.wait_on_rpc_channel().await {
182            match msg {
183                Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
184                    Ok(json) => {
185                        if is_subscription_confirmation_response(&json) {
186                            let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
187                            let result = json.get("result").unwrap().as_str().unwrap();
188                            let event_type = self
189                                .pending_subscription_request
190                                .get(&subscription_request_id)
191                                .unwrap();
192                            self.subscription_event_types
193                                .insert(result.to_string(), event_type.clone());
194                            self.pending_subscription_request
195                                .remove(&subscription_request_id);
196                            continue;
197                        } else if is_subscription_event(&json) {
198                            let subscription_id = match extract_rpc_subscription_id(&json) {
199                                Some(id) => id,
200                                None => {
201                                    return Err(BlockchainRpcClientError::InternalRpcClientError(
202                                        "Error parsing subscription id from valid rpc response"
203                                            .to_string(),
204                                    ));
205                                }
206                            };
207                            if let Some(event_type) =
208                                self.subscription_event_types.get(subscription_id)
209                            {
210                                match event_type {
211                                    RpcEventType::NewBlock => {
212                                        return match serde_json::from_value::<
213                                            RpcNodeWssResponse<Block>,
214                                        >(json)
215                                        {
216                                            Ok(block_response) => {
217                                                let block = block_response.params.result;
218                                                Ok(BlockchainMessage::Block(block))
219                                            }
220                                            Err(e) => {
221                                                Err(BlockchainRpcClientError::MessageParsingError(
222                                                    format!(
223                                                        "Error parsing rpc response to block with error {e}"
224                                                    ),
225                                                ))
226                                            }
227                                        };
228                                    }
229                                }
230                            }
231                            return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
232                                "Event type not found for defined subscription id {subscription_id}"
233                            )));
234                        }
235                        return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
236                            json.to_string(),
237                        ));
238                    }
239                    Err(e) => {
240                        return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
241                    }
242                },
243                Message::Pong(_) => {
244                    continue;
245                }
246                _ => {
247                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
248                        msg.to_string(),
249                    ));
250                }
251            }
252        }
253
254        Err(BlockchainRpcClientError::NoMessageReceived)
255    }
256
257    /// Subscribes to real-time block updates from the blockchain node.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the subscription request fails or if the client is not connected.
262    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
263        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
264            .await
265    }
266
267    /// Cancels the subscription to real-time block updates.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the unsubscription request fails or if the client is not connected.
272    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
273        self.unsubscribe_events(String::from("newHeads")).await?;
274
275        // Find and remove the subscription ID associated with the newBlock event type
276        let subscription_ids_to_remove: Vec<String> = self
277            .subscription_event_types
278            .iter()
279            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
280            .map(|(id, _)| id.clone())
281            .collect();
282
283        for id in subscription_ids_to_remove {
284            self.subscription_event_types.remove(&id);
285        }
286        Ok(())
287    }
288}