nautilus_blockchain/services/
pool_discovery.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_model::defi::{
21    SharedDex,
22    amm::Pool,
23    chain::SharedChain,
24    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
25    token::Token,
26};
27use thousands::Separable;
28use tokio_util::sync::CancellationToken;
29
30use crate::{
31    cache::BlockchainCache,
32    config::BlockchainDataClientConfig,
33    contracts::erc20::Erc20Contract,
34    events::pool_created::PoolCreatedEvent,
35    exchanges::extended::DexExtended,
36    hypersync::{client::HyperSyncClient, helpers::extract_block_number},
37};
38
39const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
40const POOL_DB_BATCH_SIZE: usize = 2000;
41
42/// Sanitizes a string by removing null bytes and other invalid characters for PostgreSQL UTF-8.
43///
44/// This function strips null bytes (0x00) and other problematic control characters that are
45/// invalid in PostgreSQL's UTF-8 text fields. Common with malformed on-chain token metadata.
46/// Preserves printable characters and common whitespace (space, tab, newline).
47fn sanitize_string(s: String) -> String {
48    s.chars()
49        .filter(|c| {
50            // Keep printable characters and common whitespace, but filter null bytes
51            // and other problematic control characters
52            *c != '\0' && (*c >= ' ' || *c == '\t' || *c == '\n' || *c == '\r')
53        })
54        .collect()
55}
56
57/// Service responsible for discovering DEX liquidity pools from blockchain events.
58///
59/// This service handles the synchronization of pool creation events from various DEXes,
60/// managing token metadata fetching, buffering strategies, and database persistence.
61#[derive(Debug)]
62pub struct PoolDiscoveryService<'a> {
63    /// The blockchain network being synced
64    chain: SharedChain,
65    /// Cache for tokens and pools
66    cache: &'a mut BlockchainCache,
67    /// ERC20 contract interface for token metadata
68    erc20_contract: &'a Erc20Contract,
69    /// HyperSync client for event streaming
70    hypersync_client: &'a HyperSyncClient,
71    /// Cancellation token for graceful shutdown
72    cancellation_token: CancellationToken,
73    /// Configuration for sync operations
74    config: BlockchainDataClientConfig,
75}
76
77impl<'a> PoolDiscoveryService<'a> {
78    /// Creates a new [`PoolDiscoveryService`] instance.
79    #[must_use]
80    pub const fn new(
81        chain: SharedChain,
82        cache: &'a mut BlockchainCache,
83        erc20_contract: &'a Erc20Contract,
84        hypersync_client: &'a HyperSyncClient,
85        cancellation_token: CancellationToken,
86        config: BlockchainDataClientConfig,
87    ) -> Self {
88        Self {
89            chain,
90            cache,
91            erc20_contract,
92            hypersync_client,
93            cancellation_token,
94            config,
95        }
96    }
97
98    /// Synchronizes pools for a specific DEX within a given block range.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if:
103    /// - HyperSync streaming fails
104    /// - Token RPC calls fail
105    /// - Database operations fail
106    /// - Sync is cancelled
107    pub async fn sync_pools(
108        &mut self,
109        dex: &DexExtended,
110        from_block: u64,
111        to_block: Option<u64>,
112        reset: bool,
113    ) -> anyhow::Result<()> {
114        // Determine effective sync range
115        let (last_synced_block, effective_from_block) = if reset {
116            (None, from_block)
117        } else {
118            let last_synced_block = self.cache.get_dex_last_synced_block(&dex.dex.name).await?;
119            let effective_from_block = last_synced_block
120                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
121            (last_synced_block, effective_from_block)
122        };
123
124        let to_block = match to_block {
125            Some(block) => block,
126            None => self.hypersync_client.current_block().await,
127        };
128
129        // Skip sync if already up to date
130        if effective_from_block > to_block {
131            tracing::info!(
132                "DEX {} already synced to block {} (current: {}), skipping sync",
133                dex.dex.name,
134                last_synced_block.unwrap_or(0).separate_with_commas(),
135                to_block.separate_with_commas()
136            );
137            return Ok(());
138        }
139
140        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
141        tracing::info!(
142            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
143            effective_from_block.separate_with_commas(),
144            to_block.separate_with_commas(),
145            total_blocks.separate_with_commas(),
146            if let Some(last_synced) = last_synced_block {
147                format!(
148                    " - resuming from last synced block {}",
149                    last_synced.separate_with_commas()
150                )
151            } else {
152                String::new()
153            },
154        );
155        tracing::info!(
156            "Syncing {} pool creation events from factory contract {} on chain {}",
157            dex.dex.name,
158            dex.factory,
159            self.chain.name
160        );
161
162        // Enable performance settings for sync operations
163        if let Err(e) = self.cache.toggle_performance_settings(true).await {
164            tracing::warn!("Failed to enable performance settings: {e}");
165        }
166
167        let mut metrics = BlockchainSyncReporter::new(
168            BlockchainSyncReportItems::PoolCreatedEvents,
169            effective_from_block,
170            total_blocks,
171            BLOCKS_PROCESS_IN_SYNC_REPORT,
172        );
173
174        let factory_address = &dex.factory;
175        let pair_created_event_signature = dex.pool_created_event.as_ref();
176        let pools_stream = self
177            .hypersync_client
178            .request_contract_events_stream(
179                effective_from_block,
180                Some(to_block),
181                factory_address,
182                vec![pair_created_event_signature],
183            )
184            .await;
185
186        tokio::pin!(pools_stream);
187
188        // LEVEL 1: RPC buffers (small, constrained by rate limits)
189        let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
190        let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
191
192        // LEVEL 2: DB buffers (large, optimize for throughput)
193        let mut token_db_buffer: Vec<Token> = Vec::new();
194        let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
195
196        let mut last_block_saved = effective_from_block;
197
198        // Tracking counters
199        let mut total_discovered = 0;
200        let mut total_skipped_exists = 0;
201        let mut total_skipped_invalid_tokens = 0;
202        let mut total_saved = 0;
203
204        let cancellation_token = self.cancellation_token.clone();
205        let sync_result = tokio::select! {
206            () = cancellation_token.cancelled() => {
207                tracing::info!("Exchange pool sync cancelled");
208                Err(anyhow::anyhow!("Sync cancelled"))
209            }
210
211            result = async {
212                while let Some(log) = pools_stream.next().await {
213                    let block_number = extract_block_number(&log)?;
214                    let blocks_progress = block_number - last_block_saved;
215                    last_block_saved = block_number;
216
217                    let pool = dex.parse_pool_created_event_hypersync(log)?;
218                    total_discovered += 1;
219
220                    if self.cache.get_pool(&pool.pool_identifier).is_some() {
221                        // Pool is already initialized and cached.
222                        total_skipped_exists += 1;
223                        continue;
224                    }
225
226                    if self.cache.is_invalid_token(&pool.token0)
227                        || self.cache.is_invalid_token(&pool.token1)
228                    {
229                        // Skip pools with invalid tokens as they cannot be properly processed or traded.
230                        total_skipped_invalid_tokens += 1;
231                        continue;
232                    }
233
234                    // Collect tokens needed for RPC fetch
235                    if self.cache.get_token(&pool.token0).is_none() {
236                        token_rpc_buffer.insert(pool.token0);
237                    }
238                    if self.cache.get_token(&pool.token1).is_none() {
239                        token_rpc_buffer.insert(pool.token1);
240                    }
241
242                    // Buffer the pool for later processing
243                    pool_events_buffer.push(pool);
244
245                    // ==== RPC FLUSHING (small batches) ====
246                    if token_rpc_buffer.len() >= token_rpc_batch_size {
247                        let fetched_tokens = self
248                            .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
249                            .await?;
250
251                        // Accumulate for later DB write
252                        token_db_buffer.extend(fetched_tokens);
253                    }
254
255                    // ==== DB FLUSHING (large batches) ====
256                    // Process pools when buffer is full
257                    if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
258                        // 1. Fetch any remaining tokens in RPC buffer (needed for pool construction)
259                        if !token_rpc_buffer.is_empty() {
260                            let fetched_tokens = self
261                                .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
262                                .await?;
263                            token_db_buffer.extend(fetched_tokens);
264                        }
265
266                        // 2. Flush ALL tokens to DB (satisfy foreign key constraints)
267                        if !token_db_buffer.is_empty() {
268                            self.cache
269                                .add_tokens_batch(std::mem::take(&mut token_db_buffer))
270                                .await?;
271                        }
272
273                        // 3. Now safe to construct and flush pools
274                        let pools = self
275                            .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
276                            .await?;
277                        total_saved += pools.len();
278                        self.cache.add_pools_batch(pools).await?;
279                    }
280
281                    metrics.update(blocks_progress as usize);
282                    // Log progress if needed
283                    if metrics.should_log_progress(block_number, to_block) {
284                        metrics.log_progress(block_number);
285                    }
286                }
287
288                // ==== FINAL FLUSH (all remaining data) ====
289                // 1. Fetch any remaining tokens
290                if !token_rpc_buffer.is_empty() {
291                    let fetched_tokens = self
292                        .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
293                        .await?;
294                    token_db_buffer.extend(fetched_tokens);
295                }
296
297                // 2. Flush all tokens to DB
298                if !token_db_buffer.is_empty() {
299                    self.cache
300                        .add_tokens_batch(std::mem::take(&mut token_db_buffer))
301                        .await?;
302                }
303
304                // 3. Process and flush all pools
305                if !pool_events_buffer.is_empty() {
306                    let pools = self
307                        .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
308                        .await?;
309                    total_saved += pools.len();
310                    self.cache.add_pools_batch(pools).await?;
311                }
312
313                metrics.log_final_stats();
314
315                // Update the last synced block after successful completion.
316                self.cache
317                    .update_dex_last_synced_block(&dex.dex.name, to_block)
318                    .await?;
319
320                tracing::info!(
321                    "Successfully synced DEX {} pools up to block {} | Summary: discovered={}, saved={}, skipped_exists={}, skipped_invalid_tokens={}",
322                    dex.dex.name,
323                    to_block.separate_with_commas(),
324                    total_discovered,
325                    total_saved,
326                    total_skipped_exists,
327                    total_skipped_invalid_tokens
328                );
329
330                Ok(())
331            } => result
332        };
333
334        sync_result?;
335
336        // Restore default safe settings after sync completion
337        if let Err(e) = self.cache.toggle_performance_settings(false).await {
338            tracing::warn!("Failed to restore default settings: {e}");
339        }
340
341        Ok(())
342    }
343
344    /// Fetches token metadata via RPC and updates in-memory cache immediately.
345    ///
346    /// This method fetches token information using multicall, updates the in-memory cache right away
347    /// (so pool construction can proceed), and returns valid tokens for later batch DB writes.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if the RPC multicall fails or database operations fail.
352    async fn fetch_and_cache_tokens_in_memory(
353        &mut self,
354        token_buffer: &mut HashSet<Address>,
355    ) -> anyhow::Result<Vec<Token>> {
356        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
357        let token_infos = self
358            .erc20_contract
359            .batch_fetch_token_info(&batch_addresses)
360            .await?;
361
362        let mut valid_tokens = Vec::new();
363
364        for (token_address, token_info) in token_infos {
365            match token_info {
366                Ok(token_info) => {
367                    // Sanitize token metadata to remove null bytes and invalid UTF-8 characters
368                    let sanitized_name = sanitize_string(token_info.name);
369                    let sanitized_symbol = sanitize_string(token_info.symbol);
370
371                    let token = Token::new(
372                        self.chain.clone(),
373                        token_address,
374                        sanitized_name,
375                        sanitized_symbol,
376                        token_info.decimals,
377                    );
378
379                    // Update in-memory cache IMMEDIATELY (so construct_pool can read it)
380                    self.cache.insert_token_in_memory(token.clone());
381
382                    // Collect for LATER DB write
383                    valid_tokens.push(token);
384                }
385                Err(token_info_error) => {
386                    self.cache.insert_invalid_token_in_memory(token_address);
387                    if let Some(database) = &self.cache.database {
388                        let sanitized_error = sanitize_string(token_info_error.to_string());
389                        database
390                            .add_invalid_token(
391                                self.chain.chain_id,
392                                &token_address,
393                                &sanitized_error,
394                            )
395                            .await?;
396                    }
397                }
398            }
399        }
400
401        Ok(valid_tokens)
402    }
403
404    /// Constructs multiple pools from pool creation events.
405    ///
406    /// Assumes all required tokens are already in the in-memory cache.
407    ///
408    /// # Errors
409    ///
410    /// Logs errors for pools that cannot be constructed (missing tokens),
411    /// but does not fail the entire batch.
412    async fn construct_pools_batch(
413        &mut self,
414        pool_events: &mut Vec<PoolCreatedEvent>,
415        dex: &SharedDex,
416    ) -> anyhow::Result<Vec<Pool>> {
417        let mut pools = Vec::with_capacity(pool_events.len());
418
419        for pool_event in pool_events.drain(..) {
420            // Both tokens should be in cache now
421            let token0 = match self.cache.get_token(&pool_event.token0) {
422                Some(token) => token.clone(),
423                None => {
424                    if !self.cache.is_invalid_token(&pool_event.token0) {
425                        tracing::warn!(
426                            "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
427                            pool_event.pool_address,
428                            pool_event.token0
429                        );
430                    }
431                    continue;
432                }
433            };
434
435            let token1 = match self.cache.get_token(&pool_event.token1) {
436                Some(token) => token.clone(),
437                None => {
438                    if !self.cache.is_invalid_token(&pool_event.token1) {
439                        tracing::warn!(
440                            "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
441                            pool_event.pool_address,
442                            pool_event.token1
443                        );
444                    }
445                    continue;
446                }
447            };
448
449            let mut pool = Pool::new(
450                self.chain.clone(),
451                dex.clone(),
452                pool_event.pool_address,
453                pool_event.pool_identifier,
454                pool_event.block_number,
455                token0,
456                token1,
457                pool_event.fee,
458                pool_event.tick_spacing,
459                nautilus_core::UnixNanos::default(),
460            );
461
462            // Set hooks if available (UniswapV4)
463            if let Some(hooks) = pool_event.hooks {
464                pool.set_hooks(hooks);
465            }
466
467            // Initialize pool with sqrt_price_x96 and tick if available (UniswapV4)
468            if let (Some(sqrt_price_x96), Some(tick)) = (pool_event.sqrt_price_x96, pool_event.tick)
469            {
470                pool.initialize(sqrt_price_x96, tick);
471            }
472
473            pools.push(pool);
474        }
475
476        Ok(pools)
477    }
478}