nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22    net_types::{BlockSelection, FieldSelection, Query},
23    simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27    defi::{Block, DexType, SharedChain},
28    identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33    exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34    rpc::types::BlockchainMessage,
35};
36
37/// The interval in milliseconds at which to check for new blocks when waiting
38/// for the hypersync to index the block.
39const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41/// A client for interacting with a HyperSync API to retrieve blockchain data.
42#[derive(Debug)]
43pub struct HyperSyncClient {
44    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
45    chain: SharedChain,
46    /// The underlying HyperSync Rust client for making API requests.
47    client: Arc<hypersync_client::Client>,
48    /// Background task handle for the block subscription task.
49    blocks_task: Option<tokio::task::JoinHandle<()>>,
50    /// Channel for sending blockchain messages to the adapter data client.
51    tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
52    /// Index of pool addressed keyed by instrument ID.
53    pool_addresses: AHashMap<InstrumentId, Address>,
54}
55
56impl HyperSyncClient {
57    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
58    ///
59    /// # Panics
60    ///
61    /// Panics if the chain's `hypersync_url` is invalid or if the underlying client cannot be initialized.
62    #[must_use]
63    pub fn new(
64        chain: SharedChain,
65        tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66    ) -> Self {
67        let mut config = hypersync_client::ClientConfig::default();
68        let hypersync_url =
69            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
70        config.url = Some(hypersync_url);
71        let client = hypersync_client::Client::new(config).unwrap();
72
73        Self {
74            chain,
75            client: Arc::new(client),
76            blocks_task: None,
77            tx,
78            pool_addresses: AHashMap::new(),
79        }
80    }
81
82    #[must_use]
83    pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
84        self.pool_addresses.get(&instrument_id)
85    }
86
87    /// Processes DEX contract events for a specific block.
88    ///
89    /// # Panics
90    ///
91    /// Panics if the DEX extended configuration cannot be retrieved or if stream creation fails.
92    pub async fn process_block_dex_contract_events(
93        &self,
94        dex: &DexType,
95        block: u64,
96        contract_addresses: Vec<Address>,
97        swap_event_encoded_signature: String,
98        mint_event_encoded_signature: String,
99        burn_event_encoded_signature: String,
100    ) {
101        let topics = vec![
102            swap_event_encoded_signature.as_str(),
103            &mint_event_encoded_signature.as_str(),
104            &burn_event_encoded_signature.as_str(),
105        ];
106        let query = Self::construct_contract_events_query(
107            block,
108            Some(block + 1),
109            contract_addresses,
110            topics,
111        );
112        let tx = if let Some(tx) = &self.tx {
113            tx.clone()
114        } else {
115            tracing::error!("Hypersync client channel should have been initialized");
116            return;
117        };
118        let client = self.client.clone();
119        let dex_extended =
120            get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
121
122        get_runtime().spawn(async move {
123            let mut rx = client
124                .stream(query, Default::default())
125                .await
126                .expect("Failed to create stream");
127
128            while let Some(response) = rx.recv().await {
129                let response = response.unwrap();
130
131                for batch in response.data.logs {
132                    for log in batch {
133                        let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
134                            Some(log_argument) => {
135                                format!("0x{}", hex::encode(log_argument.as_ref()))
136                            }
137                            None => continue,
138                        };
139                        if event_signature == swap_event_encoded_signature {
140                            match dex_extended.parse_swap_event(log.clone()) {
141                                Ok(swap_event) => {
142                                    if let Err(e) =
143                                        tx.send(BlockchainMessage::SwapEvent(swap_event))
144                                    {
145                                        tracing::error!("Failed to send swap event: {}", e);
146                                    }
147                                }
148                                Err(e) => {
149                                    tracing::error!(
150                                        "Failed to parse swap with error '{:?}' for event: {:?}",
151                                        e,
152                                        log
153                                    );
154                                    continue;
155                                }
156                            }
157                        } else if event_signature == mint_event_encoded_signature {
158                            match dex_extended.parse_mint_event(log.clone()) {
159                                Ok(swap_event) => {
160                                    if let Err(e) =
161                                        tx.send(BlockchainMessage::MintEvent(swap_event))
162                                    {
163                                        tracing::error!("Failed to send mint event: {}", e);
164                                    }
165                                }
166                                Err(e) => {
167                                    tracing::error!(
168                                        "Failed to parse mint with error '{:?}' for event: {:?}",
169                                        e,
170                                        log
171                                    );
172                                    continue;
173                                }
174                            }
175                        } else if event_signature == burn_event_encoded_signature {
176                            match dex_extended.parse_burn_event(log.clone()) {
177                                Ok(swap_event) => {
178                                    if let Err(e) =
179                                        tx.send(BlockchainMessage::BurnEvent(swap_event))
180                                    {
181                                        tracing::error!("Failed to send burn event: {}", e);
182                                    }
183                                }
184                                Err(e) => {
185                                    tracing::error!(
186                                        "Failed to parse burn with error '{:?}' for event: {:?}",
187                                        e,
188                                        log
189                                    );
190                                    continue;
191                                }
192                            }
193                        } else {
194                            tracing::error!("Unknown event signature: {}", event_signature);
195                            continue;
196                        }
197                    }
198                }
199            }
200        });
201    }
202
203    /// Creates a stream of contract event logs matching the specified criteria.
204    ///
205    /// # Panics
206    ///
207    /// Panics if the contract address cannot be parsed as a valid Ethereum address.
208    pub async fn request_contract_events_stream(
209        &self,
210        from_block: u64,
211        to_block: Option<u64>,
212        contract_address: &Address,
213        topics: Vec<&str>,
214    ) -> impl Stream<Item = Log> + use<> {
215        let query = Self::construct_contract_events_query(
216            from_block,
217            to_block,
218            vec![contract_address.clone()],
219            topics,
220        );
221
222        let mut rx = self
223            .client
224            .clone()
225            .stream(query, Default::default())
226            .await
227            .expect("Failed to create stream");
228
229        async_stream::stream! {
230              while let Some(response) = rx.recv().await {
231                let response = response.unwrap();
232
233                for batch in response.data.logs {
234                    for log in batch {
235                        yield log
236                    }
237                }
238            }
239        }
240    }
241
242    /// Disconnects from the HyperSync service and stops all background tasks.
243    pub fn disconnect(&mut self) {
244        self.unsubscribe_blocks();
245    }
246
247    /// Returns the current block
248    ///
249    /// # Panics
250    ///
251    /// Panics if the client height request fails.
252    pub async fn current_block(&self) -> u64 {
253        self.client.get_height().await.unwrap()
254    }
255
256    /// Creates a stream that yields blockchain blocks within the specified range.
257    ///
258    /// # Panics
259    ///
260    /// Panics if the stream creation or block transformation fails.
261    pub async fn request_blocks_stream(
262        &self,
263        from_block: u64,
264        to_block: Option<u64>,
265    ) -> impl Stream<Item = Block> {
266        let query = Self::construct_block_query(from_block, to_block);
267        let mut rx = self
268            .client
269            .clone()
270            .stream(query, Default::default())
271            .await
272            .unwrap();
273
274        let chain = self.chain.name;
275
276        async_stream::stream! {
277            while let Some(response) = rx.recv().await {
278                let response = response.unwrap();
279                for batch in response.data.blocks {
280                        for received_block in batch {
281                            let block = transform_hypersync_block(chain, received_block).unwrap();
282                            yield block
283                        }
284                    }
285            }
286        }
287    }
288
289    /// Starts a background task that continuously polls for new blockchain blocks.
290    ///
291    /// # Panics
292    ///
293    /// Panics if client height requests or block transformations fail.
294    pub fn subscribe_blocks(&mut self) {
295        if self.blocks_task.is_some() {
296            return;
297        }
298
299        let chain = self.chain.name;
300        let client = self.client.clone();
301        let tx = if let Some(tx) = &self.tx {
302            tx.clone()
303        } else {
304            tracing::error!("Hypersync client channel should have been initialized");
305            return;
306        };
307
308        let task = get_runtime().spawn(async move {
309            tracing::debug!("Starting task 'blocks_feed");
310
311            let current_block_height = client.get_height().await.unwrap();
312            let mut query = Self::construct_block_query(current_block_height, None);
313
314            loop {
315                let response = client.get(&query).await.unwrap();
316                for batch in response.data.blocks {
317                    for received_block in batch {
318                        let block = transform_hypersync_block(chain, received_block).unwrap();
319                        let msg = BlockchainMessage::Block(block);
320                        if let Err(e) = tx.send(msg) {
321                            log::error!("Error sending message: {e}");
322                        }
323                    }
324                }
325
326                if let Some(archive_block_height) = response.archive_height
327                    && archive_block_height < response.next_block
328                {
329                    while client.get_height().await.unwrap() < response.next_block {
330                        tokio::time::sleep(std::time::Duration::from_millis(
331                            BLOCK_POLLING_INTERVAL_MS,
332                        ))
333                        .await;
334                    }
335                }
336
337                query.from_block = response.next_block;
338            }
339        });
340
341        self.blocks_task = Some(task);
342    }
343
344    /// Constructs a HyperSync query for fetching blocks with all available fields within the specified range.
345    fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
346        let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
347            .fields
348            .iter()
349            .map(|x| x.name.clone())
350            .collect();
351
352        Query {
353            from_block,
354            to_block,
355            blocks: vec![BlockSelection::default()],
356            field_selection: FieldSelection {
357                block: all_block_fields,
358                ..Default::default()
359            },
360            ..Default::default()
361        }
362    }
363
364    fn construct_contract_events_query(
365        from_block: u64,
366        to_block: Option<u64>,
367        contract_addresses: Vec<Address>,
368        topics: Vec<&str>,
369    ) -> Query {
370        let mut query_value = serde_json::json!({
371            "from_block": from_block,
372            "logs": [{
373                "topics": [topics],
374                "address": contract_addresses
375            }],
376            "field_selection": {
377                "log": [
378                    "block_number",
379                    "transaction_hash",
380                    "transaction_index",
381                    "log_index",
382                    "address",
383                    "data",
384                    "topic0",
385                    "topic1",
386                    "topic2",
387                    "topic3",
388                ]
389            }
390        });
391
392        if let Some(to_block) = to_block
393            && let Some(obj) = query_value.as_object_mut()
394        {
395            obj.insert("to_block".to_string(), serde_json::json!(to_block));
396        }
397
398        serde_json::from_value(query_value).unwrap()
399    }
400
401    /// Unsubscribes from new blocks by stopping the background watch task.
402    pub fn unsubscribe_blocks(&mut self) {
403        if let Some(task) = self.blocks_task.take() {
404            task.abort();
405            tracing::debug!("Unsubscribed from blocks");
406        }
407    }
408}