nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22    net_types::{BlockSelection, FieldSelection, Query},
23    simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27    defi::{Block, DexType, SharedChain},
28    identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33    exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34    rpc::types::BlockchainMessage,
35};
36
37/// The interval in milliseconds at which to check for new blocks when waiting
38/// for the hypersync to index the block.
39const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41/// Timeout in seconds for HyperSync HTTP requests.
42const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
43
44/// Timeout in seconds for graceful task shutdown during disconnect.
45/// If the task doesn't finish within this time, it will be forcefully aborted.
46const DISCONNECT_TIMEOUT_SECS: u64 = 5;
47
48/// A client for interacting with a HyperSync API to retrieve blockchain data.
49#[derive(Debug)]
50pub struct HyperSyncClient {
51    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
52    chain: SharedChain,
53    /// The underlying HyperSync Rust client for making API requests.
54    client: Arc<hypersync_client::Client>,
55    /// Background task handle for the block subscription task.
56    blocks_task: Option<tokio::task::JoinHandle<()>>,
57    /// Cancellation token for the blocks subscription task.
58    blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
59    /// Channel for sending blockchain messages to the adapter data client.
60    tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
61    /// Index of pool addressed keyed by instrument ID.
62    pool_addresses: AHashMap<InstrumentId, Address>,
63    /// Cancellation token for graceful shutdown of background tasks.
64    cancellation_token: tokio_util::sync::CancellationToken,
65}
66
67impl HyperSyncClient {
68    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
69    ///
70    /// # Panics
71    ///
72    /// Panics if the chain's `hypersync_url` is invalid or if the underlying client cannot be initialized.
73    #[must_use]
74    pub fn new(
75        chain: SharedChain,
76        tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
77        cancellation_token: tokio_util::sync::CancellationToken,
78    ) -> Self {
79        let mut config = hypersync_client::ClientConfig::default();
80        let hypersync_url =
81            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
82        config.url = Some(hypersync_url);
83        let client = hypersync_client::Client::new(config).unwrap();
84
85        Self {
86            chain,
87            client: Arc::new(client),
88            blocks_task: None,
89            blocks_cancellation_token: None,
90            tx,
91            pool_addresses: AHashMap::new(),
92            cancellation_token,
93        }
94    }
95
96    #[must_use]
97    pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
98        self.pool_addresses.get(&instrument_id)
99    }
100
101    /// Processes DEX contract events for a specific block.
102    ///
103    /// # Panics
104    ///
105    /// Panics if the DEX extended configuration cannot be retrieved or if stream creation fails.
106    pub fn process_block_dex_contract_events(
107        &mut self,
108        dex: &DexType,
109        block: u64,
110        contract_addresses: Vec<Address>,
111        swap_event_encoded_signature: String,
112        mint_event_encoded_signature: String,
113        burn_event_encoded_signature: String,
114    ) {
115        let topics = vec![
116            swap_event_encoded_signature.as_str(),
117            &mint_event_encoded_signature.as_str(),
118            &burn_event_encoded_signature.as_str(),
119        ];
120        let query = Self::construct_contract_events_query(
121            block,
122            Some(block + 1),
123            contract_addresses,
124            topics,
125        );
126        let tx = if let Some(tx) = &self.tx {
127            tx.clone()
128        } else {
129            tracing::error!("Hypersync client channel should have been initialized");
130            return;
131        };
132        let client = self.client.clone();
133        let dex_extended =
134            get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
135        let cancellation_token = self.cancellation_token.clone();
136
137        let _task = get_runtime().spawn(async move {
138            let mut rx = match client.stream(query, Default::default()).await {
139                Ok(rx) => rx,
140                Err(e) => {
141                    tracing::error!("Failed to create DEX event stream: {e}");
142                    return;
143                }
144            };
145
146            loop {
147                tokio::select! {
148                    () = cancellation_token.cancelled() => {
149                        tracing::debug!("DEX event processing task received cancellation signal");
150                        break;
151                    }
152                    response = rx.recv() => {
153                        let Some(response) = response else {
154                            break;
155                        };
156
157                        let response = match response {
158                            Ok(resp) => resp,
159                            Err(e) => {
160                                tracing::error!("Failed to receive DEX event stream response: {e}");
161                                break;
162                            }
163                        };
164
165                        for batch in response.data.logs {
166                            for log in batch {
167                                let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
168                                    Some(log_argument) => {
169                                        format!("0x{}", hex::encode(log_argument.as_ref()))
170                                    }
171                                    None => continue,
172                                };
173                                if event_signature == swap_event_encoded_signature {
174                                    match dex_extended.parse_swap_event(log.clone()) {
175                                        Ok(swap_event) => {
176                                            if let Err(e) =
177                                                tx.send(BlockchainMessage::SwapEvent(swap_event))
178                                            {
179                                                tracing::error!("Failed to send swap event: {e}");
180                                            }
181                                        }
182                                        Err(e) => {
183                                            tracing::error!(
184                                                "Failed to parse swap with error '{e:?}' for event: {log:?}",
185                                            );
186                                            continue;
187                                        }
188                                    }
189                                } else if event_signature == mint_event_encoded_signature {
190                                    match dex_extended.parse_mint_event(log.clone()) {
191                                        Ok(swap_event) => {
192                                            if let Err(e) =
193                                                tx.send(BlockchainMessage::MintEvent(swap_event))
194                                            {
195                                                tracing::error!("Failed to send mint event: {e}");
196                                            }
197                                        }
198                                        Err(e) => {
199                                            tracing::error!(
200                                                "Failed to parse mint with error '{e:?}' for event: {log:?}",
201                                            );
202                                            continue;
203                                        }
204                                    }
205                                } else if event_signature == burn_event_encoded_signature {
206                                    match dex_extended.parse_burn_event(log.clone()) {
207                                        Ok(swap_event) => {
208                                            if let Err(e) =
209                                                tx.send(BlockchainMessage::BurnEvent(swap_event))
210                                            {
211                                                tracing::error!("Failed to send burn event: {e}");
212                                            }
213                                        }
214                                        Err(e) => {
215                                            tracing::error!(
216                                                "Failed to parse burn with error '{e:?}' for event: {log:?}",
217                                            );
218                                            continue;
219                                        }
220                                    }
221                                } else {
222                                    tracing::error!("Unknown event signature: {event_signature}");
223                                    continue;
224                                }
225                            }
226                        }
227                    }
228                }
229            }
230        });
231
232        // Fire-and-forget: task is short-lived (processes one block), errors are logged,
233        // and it responds to cancellation_token for graceful shutdown
234    }
235
236    /// Creates a stream of contract event logs matching the specified criteria.
237    ///
238    /// # Panics
239    ///
240    /// Panics if the contract address cannot be parsed as a valid Ethereum address.
241    pub async fn request_contract_events_stream(
242        &self,
243        from_block: u64,
244        to_block: Option<u64>,
245        contract_address: &Address,
246        topics: Vec<&str>,
247    ) -> impl Stream<Item = Log> + use<> {
248        let query = Self::construct_contract_events_query(
249            from_block,
250            to_block,
251            vec![contract_address.clone()],
252            topics,
253        );
254
255        let mut rx = self
256            .client
257            .clone()
258            .stream(query, Default::default())
259            .await
260            .expect("Failed to create stream");
261
262        async_stream::stream! {
263              while let Some(response) = rx.recv().await {
264                let response = response.unwrap();
265
266                for batch in response.data.logs {
267                    for log in batch {
268                        yield log
269                    }
270                }
271            }
272        }
273    }
274
275    /// Disconnects from the HyperSync service and stops all background tasks.
276    pub async fn disconnect(&mut self) {
277        tracing::debug!("Disconnecting HyperSync client");
278        self.cancellation_token.cancel();
279
280        // Await blocks task with timeout, abort if it takes too long
281        if let Some(mut task) = self.blocks_task.take() {
282            match tokio::time::timeout(
283                std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
284                &mut task,
285            )
286            .await
287            {
288                Ok(Ok(())) => {
289                    tracing::debug!("Blocks task completed gracefully");
290                }
291                Ok(Err(e)) => {
292                    tracing::error!("Error awaiting blocks task: {e}");
293                }
294                Err(_) => {
295                    tracing::warn!(
296                        "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
297                         aborting task (this is expected if Hypersync long-poll was in progress)"
298                    );
299                    task.abort();
300                    let _ = task.await;
301                }
302            }
303        }
304
305        // DEX event tasks are short-lived and self-clean via cancellation_token
306
307        tracing::debug!("HyperSync client disconnected");
308    }
309
310    /// Returns the current block
311    ///
312    /// # Panics
313    ///
314    /// Panics if the client height request fails.
315    pub async fn current_block(&self) -> u64 {
316        self.client.get_height().await.unwrap()
317    }
318
319    /// Creates a stream that yields blockchain blocks within the specified range.
320    ///
321    /// # Panics
322    ///
323    /// Panics if the stream creation or block transformation fails.
324    pub async fn request_blocks_stream(
325        &self,
326        from_block: u64,
327        to_block: Option<u64>,
328    ) -> impl Stream<Item = Block> {
329        let query = Self::construct_block_query(from_block, to_block);
330        let mut rx = self
331            .client
332            .clone()
333            .stream(query, Default::default())
334            .await
335            .unwrap();
336
337        let chain = self.chain.name;
338
339        async_stream::stream! {
340            while let Some(response) = rx.recv().await {
341                let response = response.unwrap();
342                for batch in response.data.blocks {
343                        for received_block in batch {
344                            let block = transform_hypersync_block(chain, received_block).unwrap();
345                            yield block
346                        }
347                    }
348            }
349        }
350    }
351
352    /// Starts a background task that continuously polls for new blockchain blocks.
353    ///
354    /// # Panics
355    ///
356    /// Panics if client height requests or block transformations fail.
357    pub fn subscribe_blocks(&mut self) {
358        if self.blocks_task.is_some() {
359            return;
360        }
361
362        let chain = self.chain.name;
363        let client = self.client.clone();
364        let tx = if let Some(tx) = &self.tx {
365            tx.clone()
366        } else {
367            tracing::error!("Hypersync client channel should have been initialized");
368            return;
369        };
370
371        // Create a child token that can be cancelled independently
372        let blocks_token = self.cancellation_token.child_token();
373        let cancellation_token = blocks_token.clone();
374        self.blocks_cancellation_token = Some(blocks_token);
375
376        let task = get_runtime().spawn(async move {
377            tracing::debug!("Starting task 'blocks_feed");
378
379            let current_block_height = client.get_height().await.unwrap();
380            let mut query = Self::construct_block_query(current_block_height, None);
381
382            loop {
383                tokio::select! {
384                    () = cancellation_token.cancelled() => {
385                        tracing::debug!("Blocks subscription task received cancellation signal");
386                        break;
387                    }
388                    result = tokio::time::timeout(
389                        std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
390                        client.get(&query)
391                    ) => {
392                        let response = match result {
393                            Ok(Ok(resp)) => resp,
394                            Ok(Err(e)) => {
395                                tracing::error!("Hypersync request failed: {e}");
396                                break;
397                            }
398                            Err(_) => {
399                                tracing::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
400                                continue;
401                            }
402                        };
403
404                        for batch in response.data.blocks {
405                            for received_block in batch {
406                                let block = transform_hypersync_block(chain, received_block).unwrap();
407                                let msg = BlockchainMessage::Block(block);
408                                if let Err(e) = tx.send(msg) {
409                                    log::error!("Error sending message: {e}");
410                                }
411                            }
412                        }
413
414                        if let Some(archive_block_height) = response.archive_height
415                            && archive_block_height < response.next_block
416                        {
417                            while client.get_height().await.unwrap() < response.next_block {
418                                tokio::select! {
419                                    () = cancellation_token.cancelled() => {
420                                        tracing::debug!("Blocks subscription task received cancellation signal during polling");
421                                        return;
422                                    }
423                                    () = tokio::time::sleep(std::time::Duration::from_millis(
424                                        BLOCK_POLLING_INTERVAL_MS,
425                                    )) => {}
426                                }
427                            }
428                        }
429
430                        query.from_block = response.next_block;
431                    }
432                }
433            }
434        });
435
436        self.blocks_task = Some(task);
437    }
438
439    /// Constructs a HyperSync query for fetching blocks with all available fields within the specified range.
440    fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
441        let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
442            .fields
443            .iter()
444            .map(|x| x.name.clone())
445            .collect();
446
447        Query {
448            from_block,
449            to_block,
450            blocks: vec![BlockSelection::default()],
451            field_selection: FieldSelection {
452                block: all_block_fields,
453                ..Default::default()
454            },
455            ..Default::default()
456        }
457    }
458
459    fn construct_contract_events_query(
460        from_block: u64,
461        to_block: Option<u64>,
462        contract_addresses: Vec<Address>,
463        topics: Vec<&str>,
464    ) -> Query {
465        let mut query_value = serde_json::json!({
466            "from_block": from_block,
467            "logs": [{
468                "topics": [topics],
469                "address": contract_addresses
470            }],
471            "field_selection": {
472                "log": [
473                    "block_number",
474                    "transaction_hash",
475                    "transaction_index",
476                    "log_index",
477                    "address",
478                    "data",
479                    "topic0",
480                    "topic1",
481                    "topic2",
482                    "topic3",
483                ]
484            }
485        });
486
487        if let Some(to_block) = to_block
488            && let Some(obj) = query_value.as_object_mut()
489        {
490            obj.insert("to_block".to_string(), serde_json::json!(to_block));
491        }
492
493        serde_json::from_value(query_value).unwrap()
494    }
495
496    /// Unsubscribes from new blocks by stopping the background watch task.
497    pub async fn unsubscribe_blocks(&mut self) {
498        if let Some(task) = self.blocks_task.take() {
499            // Cancel only the blocks child token, not the main cancellation token
500            if let Some(token) = self.blocks_cancellation_token.take() {
501                token.cancel();
502            }
503            if let Err(e) = task.await {
504                tracing::error!("Error awaiting blocks task during unsubscribe: {e}");
505            }
506            tracing::debug!("Unsubscribed from blocks");
507        }
508    }
509}