nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22    net_types::{BlockField, BlockSelection, FieldSelection, Query},
23    simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27    defi::{Block, DexType, SharedChain},
28    identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33    exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34    rpc::types::BlockchainMessage,
35};
36
37/// The interval in milliseconds at which to check for new blocks when waiting
38/// for the hypersync to index the block.
39const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41/// Timeout in seconds for HyperSync HTTP requests.
42const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
43
44/// Timeout in seconds for graceful task shutdown during disconnect.
45/// If the task doesn't finish within this time, it will be forcefully aborted.
46const DISCONNECT_TIMEOUT_SECS: u64 = 5;
47
48/// A client for interacting with a HyperSync API to retrieve blockchain data.
49#[derive(Debug)]
50pub struct HyperSyncClient {
51    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
52    chain: SharedChain,
53    /// The underlying HyperSync Rust client for making API requests.
54    client: Arc<hypersync_client::Client>,
55    /// Background task handle for the block subscription task.
56    blocks_task: Option<tokio::task::JoinHandle<()>>,
57    /// Cancellation token for the blocks subscription task.
58    blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
59    /// Channel for sending blockchain messages to the adapter data client.
60    tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
61    /// Index of pool addressed keyed by instrument ID.
62    pool_addresses: AHashMap<InstrumentId, Address>,
63    /// Cancellation token for graceful shutdown of background tasks.
64    cancellation_token: tokio_util::sync::CancellationToken,
65}
66
67impl HyperSyncClient {
68    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
69    ///
70    /// # Panics
71    ///
72    /// Panics if:
73    /// - The chain's `hypersync_url` is invalid.
74    /// - The `ENVIO_API_TOKEN` environment variable is not set or invalid.
75    /// - The underlying client cannot be initialized.
76    #[must_use]
77    pub fn new(
78        chain: SharedChain,
79        tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
80        cancellation_token: tokio_util::sync::CancellationToken,
81    ) -> Self {
82        let mut config = hypersync_client::ClientConfig::default();
83        let hypersync_url =
84            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
85        config.url = hypersync_url.to_string();
86        config.api_token = std::env::var("ENVIO_API_TOKEN")
87            .expect("ENVIO_API_TOKEN environment variable must be set");
88        let client = hypersync_client::Client::new(config)
89            .expect("Failed to create HyperSync client - check ENVIO_API_TOKEN is a valid UUID");
90
91        Self {
92            chain,
93            client: Arc::new(client),
94            blocks_task: None,
95            blocks_cancellation_token: None,
96            tx,
97            pool_addresses: AHashMap::new(),
98            cancellation_token,
99        }
100    }
101
102    #[must_use]
103    pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
104        self.pool_addresses.get(&instrument_id)
105    }
106
107    /// Processes DEX contract events for a specific block.
108    ///
109    /// # Panics
110    ///
111    /// Panics if the DEX extended configuration cannot be retrieved or if stream creation fails.
112    pub fn process_block_dex_contract_events(
113        &mut self,
114        dex: &DexType,
115        block: u64,
116        contract_addresses: Vec<Address>,
117        swap_event_encoded_signature: String,
118        mint_event_encoded_signature: String,
119        burn_event_encoded_signature: String,
120    ) {
121        let topics = vec![
122            swap_event_encoded_signature.as_str(),
123            &mint_event_encoded_signature.as_str(),
124            &burn_event_encoded_signature.as_str(),
125        ];
126        let query = Self::construct_contract_events_query(
127            block,
128            Some(block + 1),
129            contract_addresses,
130            topics,
131        );
132        let tx = if let Some(tx) = &self.tx {
133            tx.clone()
134        } else {
135            tracing::error!("Hypersync client channel should have been initialized");
136            return;
137        };
138        let client = self.client.clone();
139        let dex_extended =
140            get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
141        let cancellation_token = self.cancellation_token.clone();
142
143        let _task = get_runtime().spawn(async move {
144            let mut rx = match client.stream(query, Default::default()).await {
145                Ok(rx) => rx,
146                Err(e) => {
147                    tracing::error!("Failed to create DEX event stream: {e}");
148                    return;
149                }
150            };
151
152            loop {
153                tokio::select! {
154                    () = cancellation_token.cancelled() => {
155                        tracing::debug!("DEX event processing task received cancellation signal");
156                        break;
157                    }
158                    response = rx.recv() => {
159                        let Some(response) = response else {
160                            break;
161                        };
162
163                        let response = match response {
164                            Ok(resp) => resp,
165                            Err(e) => {
166                                tracing::error!("Failed to receive DEX event stream response: {e}");
167                                break;
168                            }
169                        };
170
171                        for batch in response.data.logs {
172                            for log in batch {
173                                let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
174                                    Some(log_argument) => {
175                                        format!("0x{}", hex::encode(log_argument.as_ref()))
176                                    }
177                                    None => continue,
178                                };
179                                if event_signature == swap_event_encoded_signature {
180                                    match dex_extended.parse_swap_event(log.clone()) {
181                                        Ok(swap_event) => {
182                                            if let Err(e) =
183                                                tx.send(BlockchainMessage::SwapEvent(swap_event))
184                                            {
185                                                tracing::error!("Failed to send swap event: {e}");
186                                            }
187                                        }
188                                        Err(e) => {
189                                            tracing::error!(
190                                                "Failed to parse swap with error '{e:?}' for event: {log:?}",
191                                            );
192                                            continue;
193                                        }
194                                    }
195                                } else if event_signature == mint_event_encoded_signature {
196                                    match dex_extended.parse_mint_event(log.clone()) {
197                                        Ok(swap_event) => {
198                                            if let Err(e) =
199                                                tx.send(BlockchainMessage::MintEvent(swap_event))
200                                            {
201                                                tracing::error!("Failed to send mint event: {e}");
202                                            }
203                                        }
204                                        Err(e) => {
205                                            tracing::error!(
206                                                "Failed to parse mint with error '{e:?}' for event: {log:?}",
207                                            );
208                                            continue;
209                                        }
210                                    }
211                                } else if event_signature == burn_event_encoded_signature {
212                                    match dex_extended.parse_burn_event(log.clone()) {
213                                        Ok(swap_event) => {
214                                            if let Err(e) =
215                                                tx.send(BlockchainMessage::BurnEvent(swap_event))
216                                            {
217                                                tracing::error!("Failed to send burn event: {e}");
218                                            }
219                                        }
220                                        Err(e) => {
221                                            tracing::error!(
222                                                "Failed to parse burn with error '{e:?}' for event: {log:?}",
223                                            );
224                                            continue;
225                                        }
226                                    }
227                                } else {
228                                    tracing::error!("Unknown event signature: {event_signature}");
229                                    continue;
230                                }
231                            }
232                        }
233                    }
234                }
235            }
236        });
237
238        // Fire-and-forget: task is short-lived (processes one block), errors are logged,
239        // and it responds to cancellation_token for graceful shutdown
240    }
241
242    /// Creates a stream of contract event logs matching the specified criteria.
243    ///
244    /// # Panics
245    ///
246    /// Panics if the contract address cannot be parsed as a valid Ethereum address.
247    pub async fn request_contract_events_stream(
248        &self,
249        from_block: u64,
250        to_block: Option<u64>,
251        contract_address: &Address,
252        topics: Vec<&str>,
253    ) -> impl Stream<Item = Log> + use<> {
254        let query = Self::construct_contract_events_query(
255            from_block,
256            to_block,
257            vec![*contract_address],
258            topics,
259        );
260
261        let mut rx = self
262            .client
263            .clone()
264            .stream(query, Default::default())
265            .await
266            .expect("Failed to create stream");
267
268        async_stream::stream! {
269              while let Some(response) = rx.recv().await {
270                let response = response.unwrap();
271
272                for batch in response.data.logs {
273                    for log in batch {
274                        yield log
275                    }
276                }
277            }
278        }
279    }
280
281    /// Disconnects from the HyperSync service and stops all background tasks.
282    pub async fn disconnect(&mut self) {
283        tracing::debug!("Disconnecting HyperSync client");
284        self.cancellation_token.cancel();
285
286        // Await blocks task with timeout, abort if it takes too long
287        if let Some(mut task) = self.blocks_task.take() {
288            match tokio::time::timeout(
289                std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
290                &mut task,
291            )
292            .await
293            {
294                Ok(Ok(())) => {
295                    tracing::debug!("Blocks task completed gracefully");
296                }
297                Ok(Err(e)) => {
298                    tracing::error!("Error awaiting blocks task: {e}");
299                }
300                Err(_) => {
301                    tracing::warn!(
302                        "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
303                         aborting task (this is expected if Hypersync long-poll was in progress)"
304                    );
305                    task.abort();
306                    let _ = task.await;
307                }
308            }
309        }
310
311        // DEX event tasks are short-lived and self-clean via cancellation_token
312
313        tracing::debug!("HyperSync client disconnected");
314    }
315
316    /// Returns the current block
317    ///
318    /// # Panics
319    ///
320    /// Panics if the client height request fails.
321    pub async fn current_block(&self) -> u64 {
322        self.client.get_height().await.unwrap()
323    }
324
325    /// Creates a stream that yields blockchain blocks within the specified range.
326    ///
327    /// # Panics
328    ///
329    /// Panics if the stream creation or block transformation fails.
330    pub async fn request_blocks_stream(
331        &self,
332        from_block: u64,
333        to_block: Option<u64>,
334    ) -> impl Stream<Item = Block> {
335        let query = Self::construct_block_query(from_block, to_block);
336        let mut rx = self
337            .client
338            .clone()
339            .stream(query, Default::default())
340            .await
341            .unwrap();
342
343        let chain = self.chain.name;
344
345        async_stream::stream! {
346            while let Some(response) = rx.recv().await {
347                let response = response.unwrap();
348                for batch in response.data.blocks {
349                        for received_block in batch {
350                            let block = transform_hypersync_block(chain, received_block).unwrap();
351                            yield block
352                        }
353                    }
354            }
355        }
356    }
357
358    /// Starts a background task that continuously polls for new blockchain blocks.
359    ///
360    /// # Panics
361    ///
362    /// Panics if client height requests or block transformations fail.
363    pub fn subscribe_blocks(&mut self) {
364        if self.blocks_task.is_some() {
365            return;
366        }
367
368        let chain = self.chain.name;
369        let client = self.client.clone();
370        let tx = if let Some(tx) = &self.tx {
371            tx.clone()
372        } else {
373            tracing::error!("Hypersync client channel should have been initialized");
374            return;
375        };
376
377        // Create a child token that can be cancelled independently
378        let blocks_token = self.cancellation_token.child_token();
379        let cancellation_token = blocks_token.clone();
380        self.blocks_cancellation_token = Some(blocks_token);
381
382        let task = get_runtime().spawn(async move {
383            tracing::debug!("Starting task 'blocks_feed");
384
385            let current_block_height = client.get_height().await.unwrap();
386            let mut query = Self::construct_block_query(current_block_height, None);
387
388            loop {
389                tokio::select! {
390                    () = cancellation_token.cancelled() => {
391                        tracing::debug!("Blocks subscription task received cancellation signal");
392                        break;
393                    }
394                    result = tokio::time::timeout(
395                        std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
396                        client.get(&query)
397                    ) => {
398                        let response = match result {
399                            Ok(Ok(resp)) => resp,
400                            Ok(Err(e)) => {
401                                tracing::error!("Hypersync request failed: {e}");
402                                break;
403                            }
404                            Err(_) => {
405                                tracing::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
406                                continue;
407                            }
408                        };
409
410                        for batch in response.data.blocks {
411                            for received_block in batch {
412                                let block = transform_hypersync_block(chain, received_block).unwrap();
413                                let msg = BlockchainMessage::Block(block);
414                                if let Err(e) = tx.send(msg) {
415                                    log::error!("Error sending message: {e}");
416                                }
417                            }
418                        }
419
420                        if let Some(archive_block_height) = response.archive_height
421                            && archive_block_height < response.next_block
422                        {
423                            while client.get_height().await.unwrap() < response.next_block {
424                                tokio::select! {
425                                    () = cancellation_token.cancelled() => {
426                                        tracing::debug!("Blocks subscription task received cancellation signal during polling");
427                                        return;
428                                    }
429                                    () = tokio::time::sleep(std::time::Duration::from_millis(
430                                        BLOCK_POLLING_INTERVAL_MS,
431                                    )) => {}
432                                }
433                            }
434                        }
435
436                        query.from_block = response.next_block;
437                    }
438                }
439            }
440        });
441
442        self.blocks_task = Some(task);
443    }
444
445    /// Constructs a HyperSync query for fetching blocks with all available fields within the specified range.
446    fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
447        Query {
448            from_block,
449            to_block,
450            blocks: vec![BlockSelection::default()],
451            field_selection: FieldSelection {
452                block: BlockField::all(),
453                ..Default::default()
454            },
455            ..Default::default()
456        }
457    }
458
459    fn construct_contract_events_query(
460        from_block: u64,
461        to_block: Option<u64>,
462        contract_addresses: Vec<Address>,
463        topics: Vec<&str>,
464    ) -> Query {
465        let mut query_value = serde_json::json!({
466            "from_block": from_block,
467            "logs": [{
468                "topics": [topics],
469                "address": contract_addresses
470            }],
471            "field_selection": {
472                "log": [
473                    "block_number",
474                    "transaction_hash",
475                    "transaction_index",
476                    "log_index",
477                    "address",
478                    "data",
479                    "topic0",
480                    "topic1",
481                    "topic2",
482                    "topic3",
483                ]
484            }
485        });
486
487        if let Some(to_block) = to_block
488            && let Some(obj) = query_value.as_object_mut()
489        {
490            obj.insert("to_block".to_string(), serde_json::json!(to_block));
491        }
492
493        serde_json::from_value(query_value).unwrap()
494    }
495
496    /// Unsubscribes from new blocks by stopping the background watch task.
497    pub async fn unsubscribe_blocks(&mut self) {
498        if let Some(task) = self.blocks_task.take() {
499            // Cancel only the blocks child token, not the main cancellation token
500            if let Some(token) = self.blocks_cancellation_token.take() {
501                token.cancel();
502            }
503            if let Err(e) = task.await {
504                tracing::error!("Error awaiting blocks task during unsubscribe: {e}");
505            }
506            tracing::debug!("Unsubscribed from blocks");
507        }
508    }
509}