nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23    Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolProfiler, PoolSwap, SharedChain,
24    SharedDex, SharedPool, Token,
25    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
26    pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
27    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
28};
29use thousands::Separable;
30
31use crate::{
32    cache::BlockchainCache,
33    config::BlockchainDataClientConfig,
34    contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
35    data::subscription::DefiDataSubscriptionManager,
36    events::{
37        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent,
38        pool_created::PoolCreatedEvent, swap::SwapEvent,
39    },
40    exchanges::{extended::DexExtended, get_dex_extended},
41    hypersync::{
42        client::HyperSyncClient,
43        helpers::{extract_block_number, extract_event_signature_bytes},
44    },
45    rpc::{
46        BlockchainRpcClient, BlockchainRpcClientAny,
47        chains::{
48            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
49            polygon::PolygonRpcClient,
50        },
51        http::BlockchainHttpRpcClient,
52        types::BlockchainMessage,
53    },
54};
55
56const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
57
58/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
59///
60/// This struct encapsulates the core functionality for interacting with blockchain networks,
61/// including syncing historical data, processing real-time events, and managing cached entities.
62#[derive(Debug)]
63pub struct BlockchainDataClientCore {
64    /// The blockchain being targeted by this client instance.
65    pub chain: SharedChain,
66    /// The configuration for the data client.
67    pub config: BlockchainDataClientConfig,
68    /// Local cache for blockchain entities.
69    pub cache: BlockchainCache,
70    /// Interface for interacting with ERC20 token contracts.
71    tokens: Erc20Contract,
72    /// Interface for interacting with UniswapV3 pool contracts.
73    univ3_pool: UniswapV3PoolContract,
74    /// Client for the HyperSync data indexing service.
75    pub hypersync_client: HyperSyncClient,
76    /// Optional WebSocket RPC client for direct blockchain node communication.
77    pub rpc_client: Option<BlockchainRpcClientAny>,
78    /// Manages subscriptions for various DEX events (swaps, mints, burns).
79    pub subscription_manager: DefiDataSubscriptionManager,
80    /// Channel sender for data events.
81    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
82    /// Cancellation token for graceful shutdown of long-running operations.
83    cancellation_token: tokio_util::sync::CancellationToken,
84}
85
86impl BlockchainDataClientCore {
87    /// Creates a new instance of [`BlockchainDataClientCore`].
88    ///
89    /// # Panics
90    ///
91    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
92    #[must_use]
93    pub fn new(
94        config: BlockchainDataClientConfig,
95        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
96        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
97        cancellation_token: tokio_util::sync::CancellationToken,
98    ) -> Self {
99        let chain = config.chain.clone();
100        let cache = BlockchainCache::new(chain.clone());
101
102        // Log RPC endpoints being used
103        tracing::info!(
104            "Initializing blockchain data client for '{}' with HTTP RPC: {}",
105            chain.name,
106            config.http_rpc_url
107        );
108
109        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
110            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
111            tracing::info!("WebSocket RPC URL: {}", wss_rpc_url);
112            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
113        } else {
114            tracing::info!("Using HyperSync for live data (no WebSocket RPC)");
115            None
116        };
117        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
118            config.http_rpc_url.clone(),
119            config.rpc_requests_per_second,
120        ));
121        let erc20_contract = Erc20Contract::new(
122            http_rpc_client.clone(),
123            config.pool_filters.remove_pools_with_empty_erc20fields,
124        );
125
126        let hypersync_client =
127            HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
128        Self {
129            chain,
130            config,
131            rpc_client,
132            tokens: erc20_contract,
133            univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
134            cache,
135            hypersync_client,
136            subscription_manager: DefiDataSubscriptionManager::new(),
137            data_tx,
138            cancellation_token,
139        }
140    }
141
142    /// Initializes the database connection for the blockchain cache.
143    pub async fn initialize_cache_database(&mut self) {
144        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
145            tracing::info!(
146                "Initializing blockchain cache on database '{}'",
147                pg_connect_options.database
148            );
149            self.cache
150                .initialize_database(pg_connect_options.clone().into())
151                .await;
152        }
153    }
154
155    /// Creates an appropriate blockchain RPC client for the specified blockchain.
156    fn initialize_rpc_client(
157        blockchain: Blockchain,
158        wss_rpc_url: String,
159    ) -> BlockchainRpcClientAny {
160        match blockchain {
161            Blockchain::Ethereum => {
162                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
163            }
164            Blockchain::Polygon => {
165                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
166            }
167            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
168            Blockchain::Arbitrum => {
169                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
170            }
171            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
172        }
173    }
174
175    /// Establishes connections to all configured data sources and initializes the cache.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if cache initialization or connection setup fails.
180    pub async fn connect(&mut self) -> anyhow::Result<()> {
181        tracing::info!(
182            "Connecting blockchain data client for '{}'",
183            self.chain.name
184        );
185        self.initialize_cache_database().await;
186
187        if let Some(ref mut rpc_client) = self.rpc_client {
188            rpc_client.connect().await?;
189        }
190
191        let from_block = self.determine_from_block();
192
193        tracing::info!(
194            "Connecting to blockchain data source for '{}' from block {}",
195            self.chain.name,
196            from_block.separate_with_commas()
197        );
198
199        // Initialize the chain and register the Dex exchanges in the cache.
200        self.cache.initialize_chain().await;
201        // Import the cached blockchain data.
202        self.cache.connect(from_block).await?;
203        // TODO disable block syncing for now as we don't have timestamps yet configured
204        // Sync the remaining blocks which are missing.
205        // self.sync_blocks(Some(from_block), None).await?;
206        for dex in self.config.dex_ids.clone() {
207            self.register_dex_exchange(dex).await?;
208            self.sync_exchange_pools(&dex, from_block, None, false)
209                .await?;
210        }
211
212        Ok(())
213    }
214
215    /// Syncs blocks with consistency checks to ensure data integrity.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if block syncing fails or if consistency checks fail.
220    pub async fn sync_blocks_checked(
221        &mut self,
222        from_block: u64,
223        to_block: Option<u64>,
224    ) -> anyhow::Result<()> {
225        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
226            // If blocks are consistent proceed with copy command.
227            if blocks_status.is_consistent() {
228                tracing::info!(
229                    "Cache is consistent: no gaps detected (last continuous block: {})",
230                    blocks_status.last_continuous_block
231                );
232                let target_block = max(blocks_status.max_block + 1, from_block);
233                tracing::info!(
234                    "Starting fast sync with COPY from block {}",
235                    target_block.separate_with_commas()
236                );
237                self.sync_blocks(target_block, to_block, true).await?;
238            } else {
239                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
240                tracing::info!(
241                    "Cache inconsistency detected: {} blocks missing between {} and {}",
242                    gap_size,
243                    blocks_status.last_continuous_block + 1,
244                    blocks_status.max_block
245                );
246
247                tracing::info!(
248                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
249                    blocks_status.last_continuous_block + 1,
250                    blocks_status.max_block
251                );
252                self.sync_blocks(
253                    blocks_status.last_continuous_block + 1,
254                    Some(blocks_status.max_block),
255                    false,
256                )
257                .await?;
258
259                tracing::info!(
260                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
261                    (blocks_status.max_block + 1).separate_with_commas()
262                );
263                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
264                    .await?;
265            }
266        } else {
267            self.sync_blocks(from_block, to_block, true).await?;
268        }
269
270        Ok(())
271    }
272
273    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if block fetching, caching, or database operations fail.
278    pub async fn sync_blocks(
279        &mut self,
280        from_block: u64,
281        to_block: Option<u64>,
282        use_copy_command: bool,
283    ) -> anyhow::Result<()> {
284        let to_block = if let Some(block) = to_block {
285            block
286        } else {
287            self.hypersync_client.current_block().await
288        };
289        let total_blocks = to_block.saturating_sub(from_block) + 1;
290        tracing::info!(
291            "Syncing blocks from {} to {} (total: {} blocks)",
292            from_block.separate_with_commas(),
293            to_block.separate_with_commas(),
294            total_blocks.separate_with_commas()
295        );
296
297        // Enable performance settings for sync operations
298        if let Err(e) = self.cache.toggle_performance_settings(true).await {
299            tracing::warn!("Failed to enable performance settings: {e}");
300        }
301
302        let blocks_stream = self
303            .hypersync_client
304            .request_blocks_stream(from_block, Some(to_block))
305            .await;
306
307        tokio::pin!(blocks_stream);
308
309        let mut metrics = BlockchainSyncReporter::new(
310            BlockchainSyncReportItems::Blocks,
311            from_block,
312            total_blocks,
313            BLOCKS_PROCESS_IN_SYNC_REPORT,
314        );
315
316        // Batch configuration
317        const BATCH_SIZE: usize = 1000;
318        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
319
320        let cancellation_token = self.cancellation_token.clone();
321        let sync_result = tokio::select! {
322            () = cancellation_token.cancelled() => {
323                tracing::info!("Block sync cancelled");
324                Err(anyhow::anyhow!("Sync cancelled"))
325            }
326            result = async {
327                while let Some(block) = blocks_stream.next().await {
328                    let block_number = block.number;
329                    if self.cache.get_block_timestamp(block_number).is_some() {
330                        continue;
331                    }
332                    batch.push(block);
333
334                    // Process batch when full or last block
335                    if batch.len() >= BATCH_SIZE || block_number >= to_block {
336                        let batch_size = batch.len();
337
338                        self.cache.add_blocks_batch(batch, use_copy_command).await?;
339                        metrics.update(batch_size);
340
341                        // Re-initialize batch vector
342                        batch = Vec::with_capacity(BATCH_SIZE);
343                    }
344
345                    // Log progress if needed
346                    if metrics.should_log_progress(block_number, to_block) {
347                        metrics.log_progress(block_number);
348                    }
349                }
350
351                // Process any remaining blocks
352                if !batch.is_empty() {
353                    let batch_size = batch.len();
354                    self.cache.add_blocks_batch(batch, use_copy_command).await?;
355                    metrics.update(batch_size);
356                }
357
358                metrics.log_final_stats();
359                Ok(())
360            } => result
361        };
362
363        sync_result?;
364
365        // Restore default safe settings after sync completion
366        if let Err(e) = self.cache.toggle_performance_settings(false).await {
367            tracing::warn!("Failed to restore default settings: {e}");
368        }
369
370        Ok(())
371    }
372
373    /// Synchronizes all events for a specific pool within the given block range.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if event syncing, parsing, or database operations fail.
378    pub async fn sync_pool_events(
379        &mut self,
380        dex: &DexType,
381        pool_address: &Address,
382        from_block: Option<u64>,
383        to_block: Option<u64>,
384        reset: bool,
385    ) -> anyhow::Result<()> {
386        let pool: SharedPool = self.get_pool(pool_address)?.clone();
387        let pool_display = pool.to_full_spec_string();
388        let from_block = from_block.unwrap_or(pool.creation_block);
389
390        let (last_synced_block, effective_from_block) = if reset {
391            (None, from_block)
392        } else {
393            let last_synced_block = self
394                .cache
395                .get_pool_last_synced_block(dex, pool_address)
396                .await?;
397            let effective_from_block = last_synced_block
398                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
399            (last_synced_block, effective_from_block)
400        };
401
402        let to_block = match to_block {
403            Some(block) => block,
404            None => self.hypersync_client.current_block().await,
405        };
406
407        // Skip sync if we're already up to date
408        if effective_from_block > to_block {
409            tracing::info!(
410                "D {} already synced to block {} (current: {}), skipping sync",
411                dex,
412                last_synced_block.unwrap_or(0).separate_with_commas(),
413                to_block.separate_with_commas()
414            );
415            return Ok(());
416        }
417
418        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
419        let last_block_across_pool_events_table = self
420            .cache
421            .get_pool_event_tables_last_block(pool_address)
422            .await?;
423
424        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
425        tracing::info!(
426            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
427            pool_display,
428            effective_from_block.separate_with_commas(),
429            to_block.separate_with_commas(),
430            total_blocks.separate_with_commas(),
431            if let Some(last_synced) = last_synced_block {
432                format!(
433                    " - resuming from last synced block {}",
434                    last_synced.separate_with_commas()
435                )
436            } else {
437                String::new()
438            }
439        );
440
441        let mut metrics = BlockchainSyncReporter::new(
442            BlockchainSyncReportItems::PoolEvents,
443            effective_from_block,
444            total_blocks,
445            BLOCKS_PROCESS_IN_SYNC_REPORT,
446        );
447        let dex_extended = self.get_dex_extended(dex)?.clone();
448        let swap_event_signature = dex_extended.swap_created_event.as_ref();
449        let mint_event_signature = dex_extended.mint_created_event.as_ref();
450        let burn_event_signature = dex_extended.burn_created_event.as_ref();
451        let collect_event_signature = dex_extended.collect_created_event.as_ref();
452        let flash_event_signature = dex_extended.flash_created_event.as_ref();
453        let initialize_event_signature: Option<&str> =
454            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
455
456        // Pre-decode event signatures to bytes for efficient comparison
457        let swap_sig_bytes = hex::decode(
458            swap_event_signature
459                .strip_prefix("0x")
460                .unwrap_or(swap_event_signature),
461        )?;
462        let mint_sig_bytes = hex::decode(
463            mint_event_signature
464                .strip_prefix("0x")
465                .unwrap_or(mint_event_signature),
466        )?;
467        let burn_sig_bytes = hex::decode(
468            burn_event_signature
469                .strip_prefix("0x")
470                .unwrap_or(burn_event_signature),
471        )?;
472        let collect_sig_bytes = hex::decode(
473            collect_event_signature
474                .strip_prefix("0x")
475                .unwrap_or(collect_event_signature),
476        )?;
477        let flash_sig_bytes = flash_event_signature
478            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
479        let initialize_sig_bytes = initialize_event_signature
480            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481
482        let mut event_signatures = vec![
483            swap_event_signature,
484            mint_event_signature,
485            burn_event_signature,
486            collect_event_signature,
487        ];
488        if let Some(event) = dex_extended.initialize_event.as_ref() {
489            event_signatures.push(event);
490        }
491        if let Some(event) = dex_extended.flash_created_event.as_ref() {
492            event_signatures.push(event);
493        }
494        let pool_events_stream = self
495            .hypersync_client
496            .request_contract_events_stream(
497                effective_from_block,
498                Some(to_block),
499                pool_address,
500                event_signatures,
501            )
502            .await;
503        tokio::pin!(pool_events_stream);
504
505        let mut last_block_saved = effective_from_block;
506        let mut blocks_processed = 0;
507
508        // Batch configuration for events
509        const EVENT_BATCH_SIZE: usize = 20000;
510        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515        // Track when we've moved beyond stale data and can use COPY
516        let mut beyond_stale_data = last_block_across_pool_events_table
517            .is_none_or(|tables_max| effective_from_block > tables_max);
518
519        let cancellation_token = self.cancellation_token.clone();
520        let sync_result = tokio::select! {
521            () = cancellation_token.cancelled() => {
522                tracing::info!("Pool event sync cancelled");
523                Err(anyhow::anyhow!("Sync cancelled"))
524            }
525            result = async {
526                while let Some(log) = pool_events_stream.next().await {
527                    let block_number = extract_block_number(&log)?;
528                    blocks_processed += block_number - last_block_saved;
529                    last_block_saved = block_number;
530
531                    let event_sig_bytes = extract_event_signature_bytes(&log)?;
532            if event_sig_bytes == swap_sig_bytes.as_slice() {
533                let swap_event = dex_extended.parse_swap_event(log)?;
534                match self.process_pool_swap_event(&swap_event, &pool) {
535                    Ok(swap) => swap_batch.push(swap),
536                    Err(e) => tracing::error!("Failed to process swap event: {e}"),
537                }
538            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539                let mint_event = dex_extended.parse_mint_event(log)?;
540                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
543                }
544            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545                let burn_event = dex_extended.parse_burn_event(log)?;
546                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
549                }
550            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551                let collect_event = dex_extended.parse_collect_event(log)?;
552                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553                    Ok(fee_collect) => collect_batch.push(fee_collect),
554                    Err(e) => tracing::error!("Failed to process collect event: {e}"),
555                }
556            } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557                let initialize_event = dex_extended.parse_initialize_event(log)?;
558                self.cache
559                    .update_pool_initialize_price_tick(&initialize_event)
560                    .await?;
561            } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562                if let Some(parse_fn) = dex_extended.parse_flash_event_fn {
563                    match parse_fn(dex_extended.dex.clone(), log) {
564                        Ok(flash_event) => {
565                            match self.process_pool_flash_event(&flash_event, &pool) {
566                                Ok(flash) => flash_batch.push(flash),
567                                Err(e) => tracing::error!("Failed to process flash event: {e}"),
568                            }
569                        }
570                        Err(e) => tracing::error!("Failed to parse flash event: {e}"),
571                    }
572                }
573            } else {
574                let event_signature = hex::encode(event_sig_bytes);
575                tracing::error!(
576                    "Unexpected event signature: {} for log {:?}",
577                    event_signature,
578                    log
579                );
580            }
581
582            // Check if we've moved beyond stale data (transition point for strategy change)
583            if !beyond_stale_data
584                && last_block_across_pool_events_table
585                    .is_some_and(|table_max| block_number > table_max)
586            {
587                tracing::info!(
588                    "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
589                    block_number
590                );
591
592                // Flush all batches with ON CONFLICT to handle any remaining duplicates
593                self.flush_event_batches(
594                    EVENT_BATCH_SIZE,
595                    &mut swap_batch,
596                    &mut liquidity_batch,
597                    &mut collect_batch,
598                    &mut flash_batch,
599                    false,
600                    true,
601                )
602                .await?;
603
604                beyond_stale_data = true;
605                tracing::info!("Switched to COPY mode - future batches will use COPY command");
606            } else {
607                // Process batches when they reach batch size
608                self.flush_event_batches(
609                    EVENT_BATCH_SIZE,
610                    &mut swap_batch,
611                    &mut liquidity_batch,
612                    &mut collect_batch,
613                    &mut flash_batch,
614                    false, // TODO temporary dont use copy command
615                    false,
616                )
617                .await?;
618            }
619
620            metrics.update(blocks_processed as usize);
621            blocks_processed = 0;
622
623            // Log progress if needed
624            if metrics.should_log_progress(block_number, to_block) {
625                metrics.log_progress(block_number);
626                self.cache
627                    .update_pool_last_synced_block(dex, pool_address, block_number)
628                    .await?;
629            }
630        }
631
632        self.flush_event_batches(
633            EVENT_BATCH_SIZE,
634            &mut swap_batch,
635            &mut liquidity_batch,
636            &mut collect_batch,
637            &mut flash_batch,
638            false,
639            true,
640        )
641        .await?;
642
643        metrics.log_final_stats();
644        self.cache
645            .update_pool_last_synced_block(dex, pool_address, to_block)
646            .await?;
647
648        tracing::info!(
649            "Successfully synced Dex '{}' Pool '{}' up to block {}",
650            dex,
651            pool_display,
652            to_block.separate_with_commas()
653        );
654                Ok(())
655            } => result
656        };
657
658        sync_result
659    }
660
661    #[allow(clippy::too_many_arguments)]
662    async fn flush_event_batches(
663        &mut self,
664        event_batch_size: usize,
665        swap_batch: &mut Vec<PoolSwap>,
666        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
667        collect_batch: &mut Vec<PoolFeeCollect>,
668        flash_batch: &mut Vec<PoolFlash>,
669        use_copy_command: bool,
670        force_flush_all: bool,
671    ) -> anyhow::Result<()> {
672        if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
673            self.cache
674                .add_pool_swaps_batch(swap_batch, use_copy_command)
675                .await?;
676            swap_batch.clear();
677        }
678        if (force_flush_all || liquidity_batch.len() >= event_batch_size)
679            && !liquidity_batch.is_empty()
680        {
681            self.cache
682                .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
683                .await?;
684            liquidity_batch.clear();
685        }
686        if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
687        {
688            self.cache
689                .add_pool_fee_collects_batch(collect_batch, use_copy_command)
690                .await?;
691            collect_batch.clear();
692        }
693        if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
694            self.cache.add_pool_flash_batch(flash_batch).await?;
695            flash_batch.clear();
696        }
697        Ok(())
698    }
699
700    /// Processes a swap event and converts it to a pool swap.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if swap event processing fails.
705    ///
706    /// # Panics
707    ///
708    /// Panics if swap event conversion to trade data fails.
709    pub fn process_pool_swap_event(
710        &self,
711        swap_event: &SwapEvent,
712        pool: &SharedPool,
713    ) -> anyhow::Result<PoolSwap> {
714        let timestamp = self
715            .cache
716            .get_block_timestamp(swap_event.block_number)
717            .copied();
718        let mut swap = swap_event.to_pool_swap(
719            self.chain.clone(),
720            pool.instrument_id,
721            pool.address,
722            timestamp,
723        );
724        swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
725
726        Ok(swap)
727    }
728
729    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
734    pub fn process_pool_mint_event(
735        &self,
736        mint_event: &MintEvent,
737        pool: &SharedPool,
738        dex_extended: &DexExtended,
739    ) -> anyhow::Result<PoolLiquidityUpdate> {
740        let timestamp = self
741            .cache
742            .get_block_timestamp(mint_event.block_number)
743            .copied();
744
745        let liquidity_update = mint_event.to_pool_liquidity_update(
746            self.chain.clone(),
747            dex_extended.dex.clone(),
748            pool.instrument_id,
749            pool.address,
750            timestamp,
751        );
752
753        // self.cache.add_liquidity_update(&liquidity_update).await?;
754
755        Ok(liquidity_update)
756    }
757
758    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
759    /// Processes a pool burn event and converts it to a liquidity update.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
764    pub fn process_pool_burn_event(
765        &self,
766        burn_event: &BurnEvent,
767        pool: &SharedPool,
768        dex_extended: &DexExtended,
769    ) -> anyhow::Result<PoolLiquidityUpdate> {
770        let timestamp = self
771            .cache
772            .get_block_timestamp(burn_event.block_number)
773            .copied();
774
775        let liquidity_update = burn_event.to_pool_liquidity_update(
776            self.chain.clone(),
777            dex_extended.dex.clone(),
778            pool.instrument_id,
779            pool.address,
780            timestamp,
781        );
782
783        // self.cache.add_liquidity_update(&liquidity_update).await?;
784
785        Ok(liquidity_update)
786    }
787
788    /// Processes a pool collect event and converts it to a fee collection.
789    ///
790    /// # Errors
791    ///
792    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
793    pub fn process_pool_collect_event(
794        &self,
795        collect_event: &CollectEvent,
796        pool: &SharedPool,
797        dex_extended: &DexExtended,
798    ) -> anyhow::Result<PoolFeeCollect> {
799        let timestamp = self
800            .cache
801            .get_block_timestamp(collect_event.block_number)
802            .copied();
803
804        let fee_collect = collect_event.to_pool_fee_collect(
805            self.chain.clone(),
806            dex_extended.dex.clone(),
807            pool.instrument_id,
808            pool.address,
809            timestamp,
810        );
811
812        Ok(fee_collect)
813    }
814
815    /// Processes a pool flash event and converts it to a flash loan.
816    ///
817    /// # Errors
818    ///
819    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
820    pub fn process_pool_flash_event(
821        &self,
822        flash_event: &FlashEvent,
823        pool: &SharedPool,
824    ) -> anyhow::Result<PoolFlash> {
825        let timestamp = self
826            .cache
827            .get_block_timestamp(flash_event.block_number)
828            .copied();
829
830        let flash = flash_event.to_pool_flash(
831            self.chain.clone(),
832            pool.instrument_id,
833            pool.address,
834            timestamp,
835        );
836
837        Ok(flash)
838    }
839
840    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
841    ///
842    /// This method performs a comprehensive sync of:
843    /// 1. Pool creation events from the DEX factory
844    /// 2. Token metadata for all tokens in discovered pools
845    /// 3. Pool entities with proper token associations
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if syncing pools, tokens, or DEX operations fail.
850    pub async fn sync_exchange_pools(
851        &mut self,
852        dex: &DexType,
853        from_block: u64,
854        to_block: Option<u64>,
855        reset: bool,
856    ) -> anyhow::Result<()> {
857        // Check for last synced block and use it as starting point if higher (unless reset is true)
858        let (last_synced_block, effective_from_block) = if reset {
859            (None, from_block)
860        } else {
861            let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
862            let effective_from_block = last_synced_block
863                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
864            (last_synced_block, effective_from_block)
865        };
866
867        let to_block = match to_block {
868            Some(block) => block,
869            None => self.hypersync_client.current_block().await,
870        };
871
872        // Skip sync if we're already up to date
873        if effective_from_block > to_block {
874            tracing::info!(
875                "DEX {} already synced to block {} (current: {}), skipping sync",
876                dex,
877                last_synced_block.unwrap_or(0).separate_with_commas(),
878                to_block.separate_with_commas()
879            );
880            return Ok(());
881        }
882
883        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
884        tracing::info!(
885            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
886            effective_from_block.separate_with_commas(),
887            to_block.separate_with_commas(),
888            total_blocks.separate_with_commas(),
889            if let Some(last_synced) = last_synced_block {
890                format!(
891                    " - resuming from last synced block {}",
892                    last_synced.separate_with_commas()
893                )
894            } else {
895                String::new()
896            }
897        );
898
899        // Enable performance settings for sync operations
900        if let Err(e) = self.cache.toggle_performance_settings(true).await {
901            tracing::warn!("Failed to enable performance settings: {e}");
902        }
903
904        let mut metrics = BlockchainSyncReporter::new(
905            BlockchainSyncReportItems::PoolCreatedEvents,
906            effective_from_block,
907            total_blocks,
908            BLOCKS_PROCESS_IN_SYNC_REPORT,
909        );
910
911        let dex = self.get_dex_extended(dex)?.clone();
912        let factory_address = &dex.factory;
913        let pair_created_event_signature = dex.pool_created_event.as_ref();
914        let pools_stream = self
915            .hypersync_client
916            .request_contract_events_stream(
917                effective_from_block,
918                Some(to_block),
919                factory_address,
920                vec![pair_created_event_signature],
921            )
922            .await;
923
924        tokio::pin!(pools_stream);
925
926        // LEVEL 1: RPC buffers (small, constrained by rate limits)
927        let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
928        let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
929
930        // LEVEL 2: DB buffers (large, optimize for throughput)
931        const POOL_DB_BATCH_SIZE: usize = 2000;
932        let mut token_db_buffer: Vec<Token> = Vec::new();
933        let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
934
935        let mut last_block_saved = effective_from_block;
936
937        let cancellation_token = self.cancellation_token.clone();
938        let sync_result = tokio::select! {
939            () = cancellation_token.cancelled() => {
940                tracing::info!("Exchange pool sync cancelled");
941                Err(anyhow::anyhow!("Sync cancelled"))
942            }
943            result = async {
944                while let Some(log) = pools_stream.next().await {
945                    let block_number = extract_block_number(&log)?;
946                    let blocks_progress = block_number - last_block_saved;
947                    last_block_saved = block_number;
948
949                    let pool = dex.parse_pool_created_event(log)?;
950                    if self.cache.get_pool(&pool.pool_address).is_some() {
951                        // Pool is already initialized and cached.
952                        continue;
953                    }
954
955                    if self.cache.is_invalid_token(&pool.token0)
956                        || self.cache.is_invalid_token(&pool.token1)
957                    {
958                        // Skip pools with invalid tokens as they cannot be properly processed or traded.
959                        continue;
960                    }
961
962                    // Collect tokens needed for RPC fetch
963                    if self.cache.get_token(&pool.token0).is_none() {
964                        token_rpc_buffer.insert(pool.token0);
965                    }
966                    if self.cache.get_token(&pool.token1).is_none() {
967                        token_rpc_buffer.insert(pool.token1);
968                    }
969
970                    // Buffer the pool for later processing
971                    pool_events_buffer.push(pool);
972
973                    // ==== RPC FLUSHING (small batches) ====
974                    if token_rpc_buffer.len() >= token_rpc_batch_size {
975                        let fetched_tokens = self
976                            .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
977                            .await?;
978
979                        // Accumulate for later DB write
980                        token_db_buffer.extend(fetched_tokens);
981                    }
982
983                    // ==== DB FLUSHING (large batches) ====
984                    // Process pools when buffer is full
985                    if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
986                        // 1. Fetch any remaining tokens in RPC buffer (needed for pool construction)
987                        if !token_rpc_buffer.is_empty() {
988                            let fetched_tokens = self
989                                .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
990                                .await?;
991                            token_db_buffer.extend(fetched_tokens);
992                        }
993
994                        // 2. Flush ALL tokens to DB (satisfy foreign key constraints)
995                        if !token_db_buffer.is_empty() {
996                            self.cache
997                                .add_tokens_batch(std::mem::take(&mut token_db_buffer))
998                                .await?;
999                        }
1000
1001                        // 3. Now safe to construct and flush pools
1002                        let pools = self
1003                            .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1004                            .await?;
1005                        self.cache.add_pools_batch(pools).await?;
1006                    }
1007
1008                    metrics.update(blocks_progress as usize);
1009                    // Log progress if needed
1010                    if metrics.should_log_progress(block_number, to_block) {
1011                        metrics.log_progress(block_number);
1012                    }
1013                }
1014
1015                // ==== FINAL FLUSH (all remaining data) ====
1016                // 1. Fetch any remaining tokens
1017                if !token_rpc_buffer.is_empty() {
1018                    let fetched_tokens = self
1019                        .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
1020                        .await?;
1021                    token_db_buffer.extend(fetched_tokens);
1022                }
1023
1024                // 2. Flush all tokens to DB
1025                if !token_db_buffer.is_empty() {
1026                    self.cache
1027                        .add_tokens_batch(std::mem::take(&mut token_db_buffer))
1028                        .await?;
1029                }
1030
1031                // 3. Process and flush all pools
1032                if !pool_events_buffer.is_empty() {
1033                    let pools = self
1034                        .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1035                        .await?;
1036                    self.cache.add_pools_batch(pools).await?;
1037                }
1038
1039                metrics.log_final_stats();
1040
1041                // Update the last synced block after successful completion.
1042                self.cache
1043                    .update_dex_last_synced_block(&dex.dex.name, to_block)
1044                    .await?;
1045
1046                tracing::info!(
1047                    "Successfully synced DEX {} pools up to block {}",
1048                    dex.dex.name,
1049                    to_block.separate_with_commas()
1050                );
1051
1052                Ok(())
1053            } => result
1054        };
1055
1056        sync_result?;
1057
1058        // Restore default safe settings after sync completion
1059        if let Err(e) = self.cache.toggle_performance_settings(false).await {
1060            tracing::warn!("Failed to restore default settings: {e}");
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// Fetches token metadata via RPC and updates in-memory cache immediately.
1067    ///
1068    /// This method fetches token information using multicall, updates the in-memory cache right away
1069    /// (so pool construction can proceed), and returns valid tokens for later batch DB writes.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns an error if the RPC multicall fails or database operations fail.
1074    async fn fetch_and_cache_tokens_in_memory(
1075        &mut self,
1076        token_buffer: &mut HashSet<Address>,
1077    ) -> anyhow::Result<Vec<Token>> {
1078        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
1079        let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
1080
1081        let mut valid_tokens = Vec::new();
1082
1083        for (token_address, token_info) in token_infos {
1084            match token_info {
1085                Ok(token_info) => {
1086                    let token = Token::new(
1087                        self.chain.clone(),
1088                        token_address,
1089                        token_info.name,
1090                        token_info.symbol,
1091                        token_info.decimals,
1092                    );
1093
1094                    // Update in-memory cache IMMEDIATELY (so construct_pool can read it)
1095                    self.cache.insert_token_in_memory(token.clone());
1096
1097                    // Collect for LATER DB write
1098                    valid_tokens.push(token);
1099                }
1100                Err(token_info_error) => {
1101                    self.cache.insert_invalid_token_in_memory(token_address);
1102                    if let Some(database) = &self.cache.database {
1103                        database
1104                            .add_invalid_token(
1105                                self.chain.chain_id,
1106                                &token_address,
1107                                &token_info_error.to_string(),
1108                            )
1109                            .await?;
1110                    }
1111                }
1112            }
1113        }
1114
1115        Ok(valid_tokens)
1116    }
1117
1118    /// Constructs multiple pools from pool creation events.
1119    ///
1120    /// Assumes all required tokens are already in the in-memory cache.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Logs errors for pools that cannot be constructed (missing tokens),
1125    /// but does not fail the entire batch.
1126    async fn construct_pools_batch(
1127        &mut self,
1128        pool_events: &mut Vec<PoolCreatedEvent>,
1129        dex: &SharedDex,
1130    ) -> anyhow::Result<Vec<Pool>> {
1131        let mut pools = Vec::with_capacity(pool_events.len());
1132
1133        for pool_event in pool_events.drain(..) {
1134            // Both tokens should be in cache now
1135            let token0 = match self.cache.get_token(&pool_event.token0) {
1136                Some(token) => token.clone(),
1137                None => {
1138                    if !self.cache.is_invalid_token(&pool_event.token0) {
1139                        tracing::warn!(
1140                            "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
1141                            pool_event.pool_address,
1142                            pool_event.token0
1143                        );
1144                    }
1145                    continue;
1146                }
1147            };
1148
1149            let token1 = match self.cache.get_token(&pool_event.token1) {
1150                Some(token) => token.clone(),
1151                None => {
1152                    if !self.cache.is_invalid_token(&pool_event.token1) {
1153                        tracing::warn!(
1154                            "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
1155                            pool_event.pool_address,
1156                            pool_event.token1
1157                        );
1158                    }
1159                    continue;
1160                }
1161            };
1162
1163            let pool = Pool::new(
1164                self.chain.clone(),
1165                dex.clone(),
1166                pool_event.pool_address,
1167                pool_event.block_number,
1168                token0,
1169                token1,
1170                pool_event.fee,
1171                pool_event.tick_spacing,
1172                UnixNanos::default(),
1173            );
1174
1175            pools.push(pool);
1176        }
1177
1178        Ok(pools)
1179    }
1180
1181    /// Registers a decentralized exchange for data collection and event monitoring.
1182    ///
1183    /// Registration involves:
1184    /// 1. Adding the DEX to the cache
1185    /// 2. Loading existing pools for the DEX
1186    /// 3. Configuring event signatures for subscriptions
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if DEX registration, cache operations, or pool loading fails.
1191    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1192        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1193            tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1194
1195            self.cache.add_dex(dex_extended.dex.clone()).await?;
1196            let _ = self.cache.load_pools(&dex_id).await?;
1197
1198            self.subscription_manager.register_dex_for_subscriptions(
1199                dex_id,
1200                dex_extended.swap_created_event.as_ref(),
1201                dex_extended.mint_created_event.as_ref(),
1202                dex_extended.burn_created_event.as_ref(),
1203                dex_extended.collect_created_event.as_ref(),
1204                dex_extended.flash_created_event.as_deref(),
1205            );
1206            Ok(())
1207        } else {
1208            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1209        }
1210    }
1211
1212    /// Bootstraps a [`PoolProfiler`] with the latest state for a given pool.
1213    ///
1214    /// Uses two paths depending on whether the pool has been synced to the database:
1215    /// - **Never synced**: Streams events from HyperSync → restores from on-chain RPC → returns `(profiler, true)`
1216    /// - **Previously synced**: Syncs new events to DB → streams from DB → returns `(profiler, false)`
1217    ///
1218    /// Both paths restore from the latest valid snapshot first (if available), otherwise initialize with pool's initial price.
1219    ///
1220    /// # Returns
1221    ///
1222    /// - `PoolProfiler`: Hydrated profiler with current pool state
1223    /// - `bool`: `true` if constructed from RPC (already valid), `false` if from DB (needs validation)
1224    ///
1225    /// # Errors
1226    ///
1227    /// Returns an error if database is not initialized or event processing fails.
1228    ///
1229    /// # Panics
1230    ///
1231    /// Panics if the database reference is unavailable.
1232    pub async fn bootstrap_latest_pool_profiler(
1233        &mut self,
1234        pool: &SharedPool,
1235    ) -> anyhow::Result<(PoolProfiler, bool)> {
1236        tracing::info!(
1237            "Bootstrapping latest pool profiler for pool {}",
1238            pool.address
1239        );
1240
1241        if self.cache.database.is_none() {
1242            anyhow::bail!(
1243                "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
1244            );
1245        }
1246
1247        let mut profiler = PoolProfiler::new(pool.clone());
1248
1249        // Calculate latest valid block position after which we need to start profiling.
1250        let from_position = match self
1251            .cache
1252            .database
1253            .as_ref()
1254            .unwrap()
1255            .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.address)
1256            .await
1257        {
1258            Ok(Some(snapshot)) => {
1259                tracing::info!(
1260                    "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
1261                    snapshot.block_position.number.separate_with_commas(),
1262                    snapshot.positions.len(),
1263                    snapshot.ticks.len()
1264                );
1265                let block_position = snapshot.block_position.clone();
1266                profiler.restore_from_snapshot(snapshot)?;
1267                tracing::info!("Restored profiler from snapshot");
1268                Some(block_position)
1269            }
1270            _ => {
1271                tracing::info!("No valid snapshot found, processing from beginning");
1272                None
1273            }
1274        };
1275
1276        // If we don't have never synced pool events, proceed with faster
1277        // construction of pool profiler from hypersync and RPC, where we
1278        // dont need syncing of pool events and fetching it from database
1279        if self
1280            .cache
1281            .database
1282            .as_ref()
1283            .unwrap()
1284            .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.address)
1285            .await?
1286            .is_none()
1287        {
1288            return self
1289                .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
1290                .await;
1291        }
1292
1293        // Sync the pool events before bootstrapping of pool profiler
1294        if let Err(e) = self
1295            .sync_pool_events(&pool.dex.name, &pool.address, None, None, false)
1296            .await
1297        {
1298            tracing::error!("Failed to sync pool events for snapshot request: {}", e);
1299        }
1300
1301        if !profiler.is_initialized {
1302            if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
1303                profiler.initialize(initial_sqrt_price_x96);
1304            } else {
1305                anyhow::bail!(
1306                    "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
1307                );
1308            }
1309        }
1310
1311        let from_block = from_position
1312            .as_ref()
1313            .map_or(profiler.pool.creation_block, |block_position| {
1314                block_position.number
1315            });
1316        let to_block = self.hypersync_client.current_block().await;
1317        let total_blocks = to_block.saturating_sub(from_block) + 1;
1318
1319        // Enable embedded profiler reporting
1320        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1321
1322        let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1323            pool.chain.clone(),
1324            pool.dex.clone(),
1325            pool.instrument_id,
1326            &pool.address,
1327            from_position.clone(),
1328        );
1329
1330        while let Some(result) = stream.next().await {
1331            match result {
1332                Ok(event) => {
1333                    profiler.process(&event)?;
1334                }
1335                Err(e) => log::error!("Error processing event: {}", e),
1336            }
1337        }
1338
1339        profiler.finalize_reporting();
1340
1341        Ok((profiler, false))
1342    }
1343
1344    /// Constructs a pool profiler by fetching events directly from HyperSync RPC.
1345    ///
1346    /// This method is used when the pool has never been synced to the database. It streams
1347    /// liquidity events (mints, burns) directly from HyperSync and processes them
1348    /// to build up the profiler's state in real-time. After processing all events, it
1349    /// restores the profiler from the current on-chain state with the provided ticks and positions
1350    ///
1351    /// # Returns
1352    ///
1353    /// Returns a tuple of:
1354    /// - `PoolProfiler`: The hydrated profiler with state built from events
1355    /// - `bool`: Always `true` to indicate the profiler state was valid, and it was constructed from RPC
1356    ///
1357    /// # Errors
1358    ///
1359    /// Returns an error if:
1360    /// - Event streaming from HyperSync fails
1361    /// - Event parsing or processing fails
1362    /// - DEX configuration is invalid
1363    async fn construct_pool_profiler_from_hypersync_rpc(
1364        &self,
1365        mut profiler: PoolProfiler,
1366        from_position: Option<BlockPosition>,
1367    ) -> anyhow::Result<(PoolProfiler, bool)> {
1368        tracing::info!(
1369            "Constructing pool profiler from hypersync stream and RPC final state querying"
1370        );
1371        let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1372        let mint_event_signature = dex_extended.mint_created_event.as_ref();
1373        let burn_event_signature = dex_extended.burn_created_event.as_ref();
1374        let initialize_event_signature =
1375            if let Some(initialize_event) = &dex_extended.initialize_event {
1376                initialize_event.as_ref()
1377            } else {
1378                anyhow::bail!(
1379                    "DEX {} does not have initialize event set.",
1380                    &profiler.pool.dex.name
1381                );
1382            };
1383        let mint_sig_bytes = hex::decode(
1384            mint_event_signature
1385                .strip_prefix("0x")
1386                .unwrap_or(mint_event_signature),
1387        )?;
1388        let burn_sig_bytes = hex::decode(
1389            burn_event_signature
1390                .strip_prefix("0x")
1391                .unwrap_or(burn_event_signature),
1392        )?;
1393        let initialize_sig_bytes = hex::decode(
1394            initialize_event_signature
1395                .strip_prefix("0x")
1396                .unwrap_or(initialize_event_signature),
1397        )?;
1398
1399        let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1400            block_position.number
1401        });
1402        let to_block = self.hypersync_client.current_block().await;
1403        let total_blocks = to_block.saturating_sub(from_block) + 1;
1404
1405        tracing::info!(
1406            "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1407            profiler.pool.address,
1408            from_block.separate_with_commas(),
1409            to_block.separate_with_commas(),
1410            total_blocks.separate_with_commas()
1411        );
1412
1413        // Enable embedded profiler reporting
1414        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1415
1416        let pool_events_stream = self
1417            .hypersync_client
1418            .request_contract_events_stream(
1419                from_block,
1420                None,
1421                &profiler.pool.address,
1422                vec![
1423                    mint_event_signature,
1424                    burn_event_signature,
1425                    initialize_event_signature,
1426                ],
1427            )
1428            .await;
1429        tokio::pin!(pool_events_stream);
1430
1431        while let Some(log) = pool_events_stream.next().await {
1432            let event_sig_bytes = extract_event_signature_bytes(&log)?;
1433
1434            if event_sig_bytes == initialize_sig_bytes {
1435                let initialize_event = dex_extended.parse_initialize_event(log)?;
1436                profiler.initialize(initialize_event.sqrt_price_x96);
1437                self.cache
1438                    .database
1439                    .as_ref()
1440                    .unwrap()
1441                    .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1442                    .await?;
1443            } else if event_sig_bytes == mint_sig_bytes {
1444                let mint_event = dex_extended.parse_mint_event(log)?;
1445                match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1446                    Ok(liquidity_update) => {
1447                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1448                    }
1449                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
1450                }
1451            } else if event_sig_bytes == burn_sig_bytes {
1452                let burn_event = dex_extended.parse_burn_event(log)?;
1453                match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1454                    Ok(liquidity_update) => {
1455                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1456                    }
1457                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
1458                }
1459            } else {
1460                let event_signature = hex::encode(event_sig_bytes);
1461                tracing::error!(
1462                    "Unexpected event signature in bootstrap_latest_pool_profiler: {} for log {:?}",
1463                    event_signature,
1464                    log
1465                );
1466            }
1467        }
1468
1469        profiler.finalize_reporting();
1470
1471        // Hydrate from the current RPC state
1472        match self.get_on_chain_snapshot(&profiler).await {
1473            Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1474            Err(e) => tracing::error!(
1475                "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1476            ),
1477        }
1478
1479        Ok((profiler, true))
1480    }
1481
1482    /// Validates a pool profiler's state against on-chain data for accuracy verification.
1483    ///
1484    /// This method performs integrity checking by comparing the profiler's internal state
1485    /// (positions, ticks, liquidity) with the actual on-chain smart contract state. For UniswapV3
1486    /// pools, it fetches current on-chain data and verifies that the profiler's tracked state matches.
1487    /// If validation succeeds or is bypassed, the snapshot is marked as valid in the database.
1488    ///
1489    /// # Errors
1490    ///
1491    /// Returns an error if database operations fail when marking the snapshot as valid.
1492    ///
1493    /// # Panics
1494    ///
1495    /// Panics if the profiler does not have a last_processed_event when already_validated is true.
1496    pub async fn check_snapshot_validity(
1497        &self,
1498        profiler: &PoolProfiler,
1499        already_validated: bool,
1500    ) -> anyhow::Result<bool> {
1501        // Determine validity and get block position for marking
1502        let (is_valid, block_position) = if already_validated {
1503            // Skip RPC call - profiler was validated during construction from RPC
1504            tracing::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1505            let last_event = profiler
1506                .last_processed_event
1507                .clone()
1508                .expect("Profiler should have last_processed_event");
1509            (true, last_event)
1510        } else {
1511            // Fetch on-chain state and compare
1512            match self.get_on_chain_snapshot(profiler).await {
1513                Ok(on_chain_snapshot) => {
1514                    tracing::info!("Comparing profiler state with on-chain state...");
1515                    let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1516                    if !valid {
1517                        tracing::error!(
1518                            "Pool profiler state does NOT match on-chain smart contract state"
1519                        );
1520                    }
1521                    (valid, on_chain_snapshot.block_position)
1522                }
1523                Err(e) => {
1524                    tracing::error!("Failed to check snapshot validity: {e}");
1525                    return Ok(false);
1526                }
1527            }
1528        };
1529
1530        // Mark snapshot as valid in database if validation passed
1531        if is_valid && let Some(cache_database) = &self.cache.database {
1532            cache_database
1533                .mark_pool_snapshot_valid(
1534                    profiler.pool.chain.chain_id,
1535                    &profiler.pool.address,
1536                    block_position.number,
1537                    block_position.transaction_index,
1538                    block_position.log_index,
1539                )
1540                .await?;
1541            tracing::info!("Marked pool profiler snapshot as valid");
1542        }
1543
1544        Ok(is_valid)
1545    }
1546
1547    /// Fetches current on-chain pool state at the last processed block.
1548    ///
1549    /// Queries the pool smart contract to retrieve active tick liquidity and position data,
1550    /// using the profiler's active positions and last processed block number.
1551    /// Used for profiler state restoration after bootstrapping and validation.
1552    async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1553        if profiler.pool.dex.name == DexType::UniswapV3 {
1554            let last_processed_event = profiler
1555                .last_processed_event
1556                .clone()
1557                .expect("We expect at least one processed event in the pool");
1558            let on_chain_snapshot = self
1559                .univ3_pool
1560                .fetch_snapshot(
1561                    &profiler.pool.address,
1562                    profiler.pool.instrument_id,
1563                    profiler.get_active_tick_values().as_slice(),
1564                    &profiler.get_all_position_keys(),
1565                    last_processed_event,
1566                )
1567                .await?;
1568
1569            Ok(on_chain_snapshot)
1570        } else {
1571            anyhow::bail!(
1572                "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1573                profiler.pool.dex.name
1574            )
1575        }
1576    }
1577
1578    /// Replays historical events for a pool to hydrate its profiler state.
1579    ///
1580    /// Streams all historical swap, liquidity, and fee collect events from the database
1581    /// and sends them through the normal data event pipeline to build up pool profiler state.
1582    ///
1583    /// # Errors
1584    ///
1585    /// Returns an error if database streaming fails or event processing fails.
1586    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1587        if let Some(database) = &self.cache.database {
1588            tracing::info!(
1589                "Replaying historical events for pool {} to hydrate profiler",
1590                pool.instrument_id
1591            );
1592
1593            let mut event_stream = database.stream_pool_events(
1594                self.chain.clone(),
1595                dex.clone(),
1596                pool.instrument_id,
1597                &pool.address,
1598                None,
1599            );
1600            let mut event_count = 0;
1601
1602            while let Some(event_result) = event_stream.next().await {
1603                match event_result {
1604                    Ok(event) => {
1605                        let data_event = match event {
1606                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1607                            DexPoolData::LiquidityUpdate(update) => {
1608                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1609                            }
1610                            DexPoolData::FeeCollect(collect) => {
1611                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1612                            }
1613                            DexPoolData::Flash(flash) => {
1614                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1615                            }
1616                        };
1617                        self.send_data(data_event);
1618                        event_count += 1;
1619                    }
1620                    Err(e) => {
1621                        tracing::error!(
1622                            "Error streaming event for pool {}: {e}",
1623                            pool.instrument_id
1624                        );
1625                    }
1626                }
1627            }
1628
1629            tracing::info!(
1630                "Replayed {event_count} historical events for pool {}",
1631                pool.instrument_id
1632            );
1633        } else {
1634            tracing::debug!(
1635                "No database available, skipping event replay for pool {}",
1636                pool.instrument_id
1637            );
1638        }
1639
1640        Ok(())
1641    }
1642
1643    /// Determines the starting block for syncing operations.
1644    fn determine_from_block(&self) -> u64 {
1645        self.config
1646            .from_block
1647            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1648    }
1649
1650    /// Retrieves extended DEX information for a registered DEX.
1651    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1652        if !self.cache.get_registered_dexes().contains(dex_id) {
1653            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1654        }
1655
1656        match get_dex_extended(self.chain.name, dex_id) {
1657            Some(dex) => Ok(dex),
1658            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1659        }
1660    }
1661
1662    /// Retrieves a pool from the cache by its address.
1663    ///
1664    /// # Errors
1665    ///
1666    /// Returns an error if the pool is not registered in the cache.
1667    pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1668        match self.cache.get_pool(pool_address) {
1669            Some(pool) => Ok(pool),
1670            None => anyhow::bail!("Pool {pool_address} is not registered"),
1671        }
1672    }
1673
1674    /// Sends a data event to all subscribers through the data channel.
1675    pub fn send_data(&self, data: DataEvent) {
1676        if let Some(data_tx) = &self.data_tx {
1677            tracing::debug!("Sending {data}");
1678
1679            if let Err(e) = data_tx.send(data) {
1680                tracing::error!("Failed to send data: {e}");
1681            }
1682        } else {
1683            tracing::error!("No data event channel for sending data");
1684        }
1685    }
1686
1687    /// Disconnects all active connections and cleanup resources.
1688    ///
1689    /// This method should be called when shutting down the client to ensure
1690    /// proper cleanup of network connections and background tasks.
1691    pub async fn disconnect(&mut self) {
1692        self.hypersync_client.disconnect().await;
1693    }
1694}