nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23    Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolProfiler, PoolSwap, SharedChain,
24    SharedDex, SharedPool, Token,
25    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
26    pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
27    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
28};
29use thousands::Separable;
30
31use crate::{
32    cache::BlockchainCache,
33    config::BlockchainDataClientConfig,
34    contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
35    data::subscription::DefiDataSubscriptionManager,
36    events::{
37        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent,
38        pool_created::PoolCreatedEvent, swap::SwapEvent,
39    },
40    exchanges::{extended::DexExtended, get_dex_extended},
41    hypersync::{
42        client::HyperSyncClient,
43        helpers::{extract_block_number, extract_event_signature_bytes},
44    },
45    rpc::{
46        BlockchainRpcClient, BlockchainRpcClientAny,
47        chains::{
48            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
49            polygon::PolygonRpcClient,
50        },
51        http::BlockchainHttpRpcClient,
52        types::BlockchainMessage,
53    },
54};
55
56const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
57
58/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
59///
60/// This struct encapsulates the core functionality for interacting with blockchain networks,
61/// including syncing historical data, processing real-time events, and managing cached entities.
62#[derive(Debug)]
63pub struct BlockchainDataClientCore {
64    /// The blockchain being targeted by this client instance.
65    pub chain: SharedChain,
66    /// The configuration for the data client.
67    pub config: BlockchainDataClientConfig,
68    /// Local cache for blockchain entities.
69    pub cache: BlockchainCache,
70    /// Interface for interacting with ERC20 token contracts.
71    tokens: Erc20Contract,
72    /// Interface for interacting with UniswapV3 pool contracts.
73    univ3_pool: UniswapV3PoolContract,
74    /// Client for the HyperSync data indexing service.
75    pub hypersync_client: HyperSyncClient,
76    /// Optional WebSocket RPC client for direct blockchain node communication.
77    pub rpc_client: Option<BlockchainRpcClientAny>,
78    /// Manages subscriptions for various DEX events (swaps, mints, burns).
79    pub subscription_manager: DefiDataSubscriptionManager,
80    /// Channel sender for data events.
81    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
82    /// Cancellation token for graceful shutdown of long-running operations.
83    cancellation_token: tokio_util::sync::CancellationToken,
84}
85
86impl BlockchainDataClientCore {
87    /// Creates a new instance of [`BlockchainDataClientCore`].
88    ///
89    /// # Panics
90    ///
91    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
92    #[must_use]
93    pub fn new(
94        config: BlockchainDataClientConfig,
95        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
96        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
97        cancellation_token: tokio_util::sync::CancellationToken,
98    ) -> Self {
99        let chain = config.chain.clone();
100        let cache = BlockchainCache::new(chain.clone());
101
102        // Log RPC endpoints being used
103        tracing::info!(
104            "Initializing blockchain data client for '{}' with HTTP RPC: {}",
105            chain.name,
106            config.http_rpc_url
107        );
108
109        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
110            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
111            tracing::info!("WebSocket RPC URL: {}", wss_rpc_url);
112            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
113        } else {
114            tracing::info!("Using HyperSync for live data (no WebSocket RPC)");
115            None
116        };
117        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
118            config.http_rpc_url.clone(),
119            config.rpc_requests_per_second,
120        ));
121        let erc20_contract = Erc20Contract::new(
122            http_rpc_client.clone(),
123            config.pool_filters.remove_pools_with_empty_erc20fields,
124        );
125
126        let hypersync_client =
127            HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
128        Self {
129            chain,
130            config,
131            rpc_client,
132            tokens: erc20_contract,
133            univ3_pool: UniswapV3PoolContract::new(http_rpc_client.clone()),
134            cache,
135            hypersync_client,
136            subscription_manager: DefiDataSubscriptionManager::new(),
137            data_tx,
138            cancellation_token,
139        }
140    }
141
142    /// Initializes the database connection for the blockchain cache.
143    pub async fn initialize_cache_database(&mut self) {
144        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
145            tracing::info!(
146                "Initializing blockchain cache on database '{}'",
147                pg_connect_options.database
148            );
149            self.cache
150                .initialize_database(pg_connect_options.clone().into())
151                .await;
152        }
153    }
154
155    /// Creates an appropriate blockchain RPC client for the specified blockchain.
156    fn initialize_rpc_client(
157        blockchain: Blockchain,
158        wss_rpc_url: String,
159    ) -> BlockchainRpcClientAny {
160        match blockchain {
161            Blockchain::Ethereum => {
162                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
163            }
164            Blockchain::Polygon => {
165                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
166            }
167            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
168            Blockchain::Arbitrum => {
169                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
170            }
171            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
172        }
173    }
174
175    /// Establishes connections to all configured data sources and initializes the cache.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if cache initialization or connection setup fails.
180    pub async fn connect(&mut self) -> anyhow::Result<()> {
181        tracing::info!(
182            "Connecting blockchain data client for '{}'",
183            self.chain.name
184        );
185        self.initialize_cache_database().await;
186
187        if let Some(ref mut rpc_client) = self.rpc_client {
188            rpc_client.connect().await?;
189        }
190
191        let from_block = self.determine_from_block();
192
193        tracing::info!(
194            "Connecting to blockchain data source for '{}' from block {}",
195            self.chain.name,
196            from_block.separate_with_commas()
197        );
198
199        // Initialize the chain and register the Dex exchanges in the cache.
200        self.cache.initialize_chain().await;
201        // Import the cached blockchain data.
202        self.cache.connect(from_block).await?;
203        // TODO disable block syncing for now as we don't have timestamps yet configured
204        // Sync the remaining blocks which are missing.
205        // self.sync_blocks(Some(from_block), None).await?;
206        for dex in self.config.dex_ids.clone() {
207            self.register_dex_exchange(dex).await?;
208            self.sync_exchange_pools(&dex, from_block, None, false)
209                .await?;
210        }
211
212        Ok(())
213    }
214
215    /// Syncs blocks with consistency checks to ensure data integrity.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if block syncing fails or if consistency checks fail.
220    pub async fn sync_blocks_checked(
221        &mut self,
222        from_block: u64,
223        to_block: Option<u64>,
224    ) -> anyhow::Result<()> {
225        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
226            // If blocks are consistent proceed with copy command.
227            if blocks_status.is_consistent() {
228                tracing::info!(
229                    "Cache is consistent: no gaps detected (last continuous block: {})",
230                    blocks_status.last_continuous_block
231                );
232                let target_block = max(blocks_status.max_block + 1, from_block);
233                tracing::info!(
234                    "Starting fast sync with COPY from block {}",
235                    target_block.separate_with_commas()
236                );
237                self.sync_blocks(target_block, to_block, true).await?;
238            } else {
239                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
240                tracing::info!(
241                    "Cache inconsistency detected: {} blocks missing between {} and {}",
242                    gap_size,
243                    blocks_status.last_continuous_block + 1,
244                    blocks_status.max_block
245                );
246
247                tracing::info!(
248                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
249                    blocks_status.last_continuous_block + 1,
250                    blocks_status.max_block
251                );
252                self.sync_blocks(
253                    blocks_status.last_continuous_block + 1,
254                    Some(blocks_status.max_block),
255                    false,
256                )
257                .await?;
258
259                tracing::info!(
260                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
261                    (blocks_status.max_block + 1).separate_with_commas()
262                );
263                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
264                    .await?;
265            }
266        } else {
267            self.sync_blocks(from_block, to_block, true).await?;
268        }
269
270        Ok(())
271    }
272
273    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if block fetching, caching, or database operations fail.
278    pub async fn sync_blocks(
279        &mut self,
280        from_block: u64,
281        to_block: Option<u64>,
282        use_copy_command: bool,
283    ) -> anyhow::Result<()> {
284        let to_block = if let Some(block) = to_block {
285            block
286        } else {
287            self.hypersync_client.current_block().await
288        };
289        let total_blocks = to_block.saturating_sub(from_block) + 1;
290        tracing::info!(
291            "Syncing blocks from {} to {} (total: {} blocks)",
292            from_block.separate_with_commas(),
293            to_block.separate_with_commas(),
294            total_blocks.separate_with_commas()
295        );
296
297        // Enable performance settings for sync operations
298        if let Err(e) = self.cache.toggle_performance_settings(true).await {
299            tracing::warn!("Failed to enable performance settings: {e}");
300        }
301
302        let blocks_stream = self
303            .hypersync_client
304            .request_blocks_stream(from_block, Some(to_block))
305            .await;
306
307        tokio::pin!(blocks_stream);
308
309        let mut metrics = BlockchainSyncReporter::new(
310            BlockchainSyncReportItems::Blocks,
311            from_block,
312            total_blocks,
313            BLOCKS_PROCESS_IN_SYNC_REPORT,
314        );
315
316        // Batch configuration
317        const BATCH_SIZE: usize = 1000;
318        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
319
320        let cancellation_token = self.cancellation_token.clone();
321        let sync_result = tokio::select! {
322            () = cancellation_token.cancelled() => {
323                tracing::info!("Block sync cancelled");
324                Err(anyhow::anyhow!("Sync cancelled"))
325            }
326            result = async {
327                while let Some(block) = blocks_stream.next().await {
328                    let block_number = block.number;
329                    if self.cache.get_block_timestamp(block_number).is_some() {
330                        continue;
331                    }
332                    batch.push(block);
333
334                    // Process batch when full or last block
335                    if batch.len() >= BATCH_SIZE || block_number >= to_block {
336                        let batch_size = batch.len();
337
338                        self.cache.add_blocks_batch(batch, use_copy_command).await?;
339                        metrics.update(batch_size);
340
341                        // Re-initialize batch vector
342                        batch = Vec::with_capacity(BATCH_SIZE);
343                    }
344
345                    // Log progress if needed
346                    if metrics.should_log_progress(block_number, to_block) {
347                        metrics.log_progress(block_number);
348                    }
349                }
350
351                // Process any remaining blocks
352                if !batch.is_empty() {
353                    let batch_size = batch.len();
354                    self.cache.add_blocks_batch(batch, use_copy_command).await?;
355                    metrics.update(batch_size);
356                }
357
358                metrics.log_final_stats();
359                Ok(())
360            } => result
361        };
362
363        sync_result?;
364
365        // Restore default safe settings after sync completion
366        if let Err(e) = self.cache.toggle_performance_settings(false).await {
367            tracing::warn!("Failed to restore default settings: {e}");
368        }
369
370        Ok(())
371    }
372
373    /// Synchronizes all events for a specific pool within the given block range.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if event syncing, parsing, or database operations fail.
378    pub async fn sync_pool_events(
379        &mut self,
380        dex: &DexType,
381        pool_address: &Address,
382        from_block: Option<u64>,
383        to_block: Option<u64>,
384        reset: bool,
385    ) -> anyhow::Result<()> {
386        let pool: SharedPool = self.get_pool(&pool_address)?.clone();
387        let pool_display = pool.to_full_spec_string();
388        let from_block = from_block.unwrap_or(pool.creation_block);
389
390        let (last_synced_block, effective_from_block) = if reset {
391            (None, from_block)
392        } else {
393            let last_synced_block = self
394                .cache
395                .get_pool_last_synced_block(dex, &pool_address)
396                .await?;
397            let effective_from_block = last_synced_block
398                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
399            (last_synced_block, effective_from_block)
400        };
401
402        let to_block = match to_block {
403            Some(block) => block,
404            None => self.hypersync_client.current_block().await,
405        };
406
407        // Skip sync if we're already up to date
408        if effective_from_block > to_block {
409            tracing::info!(
410                "D {} already synced to block {} (current: {}), skipping sync",
411                dex,
412                last_synced_block.unwrap_or(0).separate_with_commas(),
413                to_block.separate_with_commas()
414            );
415            return Ok(());
416        }
417
418        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
419        let last_block_across_pool_events_table = self
420            .cache
421            .get_pool_event_tables_last_block(&pool_address)
422            .await?;
423
424        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
425        tracing::info!(
426            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
427            pool_display,
428            effective_from_block.separate_with_commas(),
429            to_block.separate_with_commas(),
430            total_blocks.separate_with_commas(),
431            if let Some(last_synced) = last_synced_block {
432                format!(
433                    " - resuming from last synced block {}",
434                    last_synced.separate_with_commas()
435                )
436            } else {
437                String::new()
438            }
439        );
440
441        let mut metrics = BlockchainSyncReporter::new(
442            BlockchainSyncReportItems::PoolEvents,
443            effective_from_block,
444            total_blocks,
445            BLOCKS_PROCESS_IN_SYNC_REPORT,
446        );
447        let dex_extended = self.get_dex_extended(dex)?.clone();
448        let swap_event_signature = dex_extended.swap_created_event.as_ref();
449        let mint_event_signature = dex_extended.mint_created_event.as_ref();
450        let burn_event_signature = dex_extended.burn_created_event.as_ref();
451        let collect_event_signature = dex_extended.collect_created_event.as_ref();
452        let flash_event_signature = dex_extended.flash_created_event.as_ref();
453        let initialize_event_signature: Option<&str> =
454            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
455
456        // Pre-decode event signatures to bytes for efficient comparison
457        let swap_sig_bytes = hex::decode(
458            swap_event_signature
459                .strip_prefix("0x")
460                .unwrap_or(swap_event_signature),
461        )?;
462        let mint_sig_bytes = hex::decode(
463            mint_event_signature
464                .strip_prefix("0x")
465                .unwrap_or(mint_event_signature),
466        )?;
467        let burn_sig_bytes = hex::decode(
468            burn_event_signature
469                .strip_prefix("0x")
470                .unwrap_or(burn_event_signature),
471        )?;
472        let collect_sig_bytes = hex::decode(
473            collect_event_signature
474                .strip_prefix("0x")
475                .unwrap_or(collect_event_signature),
476        )?;
477        let flash_sig_bytes = flash_event_signature
478            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
479        let initialize_sig_bytes = initialize_event_signature
480            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481
482        let mut event_signatures = vec![
483            swap_event_signature,
484            mint_event_signature,
485            burn_event_signature,
486            collect_event_signature,
487        ];
488        if let Some(event) = dex_extended.initialize_event.as_ref() {
489            event_signatures.push(event);
490        }
491        if let Some(event) = dex_extended.flash_created_event.as_ref() {
492            event_signatures.push(event);
493        }
494        let pool_events_stream = self
495            .hypersync_client
496            .request_contract_events_stream(
497                effective_from_block,
498                Some(to_block),
499                &pool_address,
500                event_signatures,
501            )
502            .await;
503        tokio::pin!(pool_events_stream);
504
505        let mut last_block_saved = effective_from_block;
506        let mut blocks_processed = 0;
507
508        // Batch configuration for events
509        const EVENT_BATCH_SIZE: usize = 20000;
510        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515        // Track when we've moved beyond stale data and can use COPY
516        let mut beyond_stale_data = last_block_across_pool_events_table
517            .map_or(true, |tables_max| effective_from_block > tables_max);
518
519        let cancellation_token = self.cancellation_token.clone();
520        let sync_result = tokio::select! {
521            () = cancellation_token.cancelled() => {
522                tracing::info!("Pool event sync cancelled");
523                Err(anyhow::anyhow!("Sync cancelled"))
524            }
525            result = async {
526                while let Some(log) = pool_events_stream.next().await {
527                    let block_number = extract_block_number(&log)?;
528                    blocks_processed += block_number - last_block_saved;
529                    last_block_saved = block_number;
530
531                    let event_sig_bytes = extract_event_signature_bytes(&log)?;
532            if event_sig_bytes == swap_sig_bytes.as_slice() {
533                let swap_event = dex_extended.parse_swap_event(log)?;
534                match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
535                    Ok(swap) => swap_batch.push(swap),
536                    Err(e) => tracing::error!("Failed to process swap event: {e}"),
537                }
538            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539                let mint_event = dex_extended.parse_mint_event(log)?;
540                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
543                }
544            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545                let burn_event = dex_extended.parse_burn_event(log)?;
546                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
549                }
550            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551                let collect_event = dex_extended.parse_collect_event(log)?;
552                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553                    Ok(fee_collect) => collect_batch.push(fee_collect),
554                    Err(e) => tracing::error!("Failed to process collect event: {e}"),
555                }
556            } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557                let initialize_event = dex_extended.parse_initialize_event(log)?;
558                self.cache
559                    .update_pool_initialize_price_tick(&initialize_event)
560                    .await?;
561            } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562                if let Some(parse_fn) = dex_extended.parse_flash_event_fn {
563                    match parse_fn(dex_extended.dex.clone(), log) {
564                        Ok(flash_event) => {
565                            match self.process_pool_flash_event(&flash_event, &pool) {
566                                Ok(flash) => flash_batch.push(flash),
567                                Err(e) => tracing::error!("Failed to process flash event: {e}"),
568                            }
569                        }
570                        Err(e) => tracing::error!("Failed to parse flash event: {e}"),
571                    }
572                }
573            } else {
574                let event_signature = hex::encode(event_sig_bytes);
575                tracing::error!(
576                    "Unexpected event signature: {} for log {:?}",
577                    event_signature,
578                    log
579                );
580            }
581
582            // Check if we've moved beyond stale data (transition point for strategy change)
583            if !beyond_stale_data
584                && last_block_across_pool_events_table
585                    .map_or(false, |table_max| block_number > table_max)
586            {
587                tracing::info!(
588                    "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
589                    block_number
590                );
591
592                // Flush all batches with ON CONFLICT to handle any remaining duplicates
593                self.flush_event_batches(
594                    EVENT_BATCH_SIZE,
595                    &mut swap_batch,
596                    &mut liquidity_batch,
597                    &mut collect_batch,
598                    &mut flash_batch,
599                    false,
600                    true,
601                )
602                .await?;
603
604                beyond_stale_data = true;
605                tracing::info!("Switched to COPY mode - future batches will use COPY command");
606            } else {
607                // Process batches when they reach batch size
608                self.flush_event_batches(
609                    EVENT_BATCH_SIZE,
610                    &mut swap_batch,
611                    &mut liquidity_batch,
612                    &mut collect_batch,
613                    &mut flash_batch,
614                    false, // TODO temporary dont use copy command
615                    false,
616                )
617                .await?;
618            }
619
620            metrics.update(blocks_processed as usize);
621            blocks_processed = 0;
622
623            // Log progress if needed
624            if metrics.should_log_progress(block_number, to_block) {
625                metrics.log_progress(block_number);
626                self.cache
627                    .update_pool_last_synced_block(dex, &pool_address, block_number)
628                    .await?;
629            }
630        }
631
632        self.flush_event_batches(
633            EVENT_BATCH_SIZE,
634            &mut swap_batch,
635            &mut liquidity_batch,
636            &mut collect_batch,
637            &mut flash_batch,
638            false,
639            true,
640        )
641        .await?;
642
643        metrics.log_final_stats();
644        self.cache
645            .update_pool_last_synced_block(dex, &pool_address, to_block)
646            .await?;
647
648        tracing::info!(
649            "Successfully synced Dex '{}' Pool '{}' up to block {}",
650            dex,
651            pool_display,
652            to_block.separate_with_commas()
653        );
654                Ok(())
655            } => result
656        };
657
658        sync_result
659    }
660
661    async fn flush_event_batches(
662        &mut self,
663        event_batch_size: usize,
664        swap_batch: &mut Vec<PoolSwap>,
665        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
666        collect_batch: &mut Vec<PoolFeeCollect>,
667        flash_batch: &mut Vec<PoolFlash>,
668        use_copy_command: bool,
669        force_flush_all: bool,
670    ) -> anyhow::Result<()> {
671        if force_flush_all || swap_batch.len() >= event_batch_size {
672            if !swap_batch.is_empty() {
673                self.cache
674                    .add_pool_swaps_batch(swap_batch, use_copy_command)
675                    .await?;
676                swap_batch.clear();
677            }
678        }
679        if force_flush_all || liquidity_batch.len() >= event_batch_size {
680            if !liquidity_batch.is_empty() {
681                self.cache
682                    .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
683                    .await?;
684                liquidity_batch.clear();
685            }
686        }
687        if force_flush_all || collect_batch.len() >= event_batch_size {
688            if !collect_batch.is_empty() {
689                self.cache
690                    .add_pool_fee_collects_batch(collect_batch, use_copy_command)
691                    .await?;
692                collect_batch.clear();
693            }
694        }
695        if force_flush_all || flash_batch.len() >= event_batch_size {
696            if !flash_batch.is_empty() {
697                self.cache.add_pool_flash_batch(flash_batch).await?;
698                flash_batch.clear();
699            }
700        }
701        Ok(())
702    }
703
704    /// Returns an error if swap event processing fails.
705    ///
706    /// # Panics
707    ///
708    /// Panics if swap event conversion to trade data fails.
709    pub fn process_pool_swap_event(
710        &self,
711        swap_event: &SwapEvent,
712        pool: &SharedPool,
713        dex_extended: &DexExtended,
714    ) -> anyhow::Result<PoolSwap> {
715        let timestamp = self
716            .cache
717            .get_block_timestamp(swap_event.block_number)
718            .copied();
719        let (side, size, price) =
720            dex_extended.convert_to_trade_data(&pool.token0, &pool.token1, swap_event)?;
721        let swap = swap_event.to_pool_swap(
722            self.chain.clone(),
723            pool.instrument_id,
724            pool.address,
725            Some(side),
726            Some(size),
727            Some(price),
728            timestamp,
729        );
730
731        // TODO add caching and persisting of swaps, resolve block timestamps sync
732        // self.cache.add_pool_swap(&swap).await?;
733
734        Ok(swap)
735    }
736
737    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
742    pub fn process_pool_mint_event(
743        &self,
744        mint_event: &MintEvent,
745        pool: &SharedPool,
746        dex_extended: &DexExtended,
747    ) -> anyhow::Result<PoolLiquidityUpdate> {
748        let timestamp = self
749            .cache
750            .get_block_timestamp(mint_event.block_number)
751            .copied();
752
753        let liquidity_update = mint_event.to_pool_liquidity_update(
754            self.chain.clone(),
755            dex_extended.dex.clone(),
756            pool.instrument_id,
757            pool.address,
758            timestamp,
759        );
760
761        // self.cache.add_liquidity_update(&liquidity_update).await?;
762
763        Ok(liquidity_update)
764    }
765
766    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
767    /// Processes a pool burn event and converts it to a liquidity update.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
772    pub fn process_pool_burn_event(
773        &self,
774        burn_event: &BurnEvent,
775        pool: &SharedPool,
776        dex_extended: &DexExtended,
777    ) -> anyhow::Result<PoolLiquidityUpdate> {
778        let timestamp = self
779            .cache
780            .get_block_timestamp(burn_event.block_number)
781            .copied();
782
783        let liquidity_update = burn_event.to_pool_liquidity_update(
784            self.chain.clone(),
785            dex_extended.dex.clone(),
786            pool.instrument_id,
787            pool.address,
788            timestamp,
789        );
790
791        // self.cache.add_liquidity_update(&liquidity_update).await?;
792
793        Ok(liquidity_update)
794    }
795
796    /// Processes a pool collect event and converts it to a fee collection.
797    ///
798    /// # Errors
799    ///
800    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
801    pub fn process_pool_collect_event(
802        &self,
803        collect_event: &CollectEvent,
804        pool: &SharedPool,
805        dex_extended: &DexExtended,
806    ) -> anyhow::Result<PoolFeeCollect> {
807        let timestamp = self
808            .cache
809            .get_block_timestamp(collect_event.block_number)
810            .copied();
811
812        let fee_collect = collect_event.to_pool_fee_collect(
813            self.chain.clone(),
814            dex_extended.dex.clone(),
815            pool.instrument_id,
816            pool.address,
817            timestamp,
818        );
819
820        Ok(fee_collect)
821    }
822
823    /// Processes a pool flash event and converts it to a flash loan.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
828    pub fn process_pool_flash_event(
829        &self,
830        flash_event: &FlashEvent,
831        pool: &SharedPool,
832    ) -> anyhow::Result<PoolFlash> {
833        let timestamp = self
834            .cache
835            .get_block_timestamp(flash_event.block_number)
836            .copied();
837
838        let flash = flash_event.to_pool_flash(
839            self.chain.clone(),
840            pool.instrument_id,
841            pool.address,
842            timestamp,
843        );
844
845        Ok(flash)
846    }
847
848    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
849    ///
850    /// This method performs a comprehensive sync of:
851    /// 1. Pool creation events from the DEX factory
852    /// 2. Token metadata for all tokens in discovered pools
853    /// 3. Pool entities with proper token associations
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if syncing pools, tokens, or DEX operations fail.
858    pub async fn sync_exchange_pools(
859        &mut self,
860        dex: &DexType,
861        from_block: u64,
862        to_block: Option<u64>,
863        reset: bool,
864    ) -> anyhow::Result<()> {
865        // Check for last synced block and use it as starting point if higher (unless reset is true)
866        let (last_synced_block, effective_from_block) = if reset {
867            (None, from_block)
868        } else {
869            let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
870            let effective_from_block = last_synced_block
871                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
872            (last_synced_block, effective_from_block)
873        };
874
875        let to_block = match to_block {
876            Some(block) => block,
877            None => self.hypersync_client.current_block().await,
878        };
879
880        // Skip sync if we're already up to date
881        if effective_from_block > to_block {
882            tracing::info!(
883                "DEX {} already synced to block {} (current: {}), skipping sync",
884                dex,
885                last_synced_block.unwrap_or(0).separate_with_commas(),
886                to_block.separate_with_commas()
887            );
888            return Ok(());
889        }
890
891        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
892        tracing::info!(
893            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
894            effective_from_block.separate_with_commas(),
895            to_block.separate_with_commas(),
896            total_blocks.separate_with_commas(),
897            if let Some(last_synced) = last_synced_block {
898                format!(
899                    " - resuming from last synced block {}",
900                    last_synced.separate_with_commas()
901                )
902            } else {
903                String::new()
904            }
905        );
906
907        // Enable performance settings for sync operations
908        if let Err(e) = self.cache.toggle_performance_settings(true).await {
909            tracing::warn!("Failed to enable performance settings: {e}");
910        }
911
912        let mut metrics = BlockchainSyncReporter::new(
913            BlockchainSyncReportItems::PoolCreatedEvents,
914            effective_from_block,
915            total_blocks,
916            BLOCKS_PROCESS_IN_SYNC_REPORT,
917        );
918
919        let dex = self.get_dex_extended(dex)?.clone();
920        let factory_address = &dex.factory;
921        let pair_created_event_signature = dex.pool_created_event.as_ref();
922        let pools_stream = self
923            .hypersync_client
924            .request_contract_events_stream(
925                effective_from_block,
926                Some(to_block),
927                factory_address,
928                vec![pair_created_event_signature],
929            )
930            .await;
931
932        tokio::pin!(pools_stream);
933
934        // LEVEL 1: RPC buffers (small, constrained by rate limits)
935        let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
936        let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
937
938        // LEVEL 2: DB buffers (large, optimize for throughput)
939        const POOL_DB_BATCH_SIZE: usize = 2000;
940        let mut token_db_buffer: Vec<Token> = Vec::new();
941        let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
942
943        let mut last_block_saved = effective_from_block;
944
945        let cancellation_token = self.cancellation_token.clone();
946        let sync_result = tokio::select! {
947            () = cancellation_token.cancelled() => {
948                tracing::info!("Exchange pool sync cancelled");
949                Err(anyhow::anyhow!("Sync cancelled"))
950            }
951            result = async {
952                while let Some(log) = pools_stream.next().await {
953                    let block_number = extract_block_number(&log)?;
954                    let blocks_progress = block_number - last_block_saved;
955                    last_block_saved = block_number;
956
957                    let pool = dex.parse_pool_created_event(log)?;
958                    if self.cache.get_pool(&pool.pool_address).is_some() {
959                        // Pool is already initialized and cached.
960                        continue;
961                    }
962
963                    if self.cache.is_invalid_token(&pool.token0)
964                        || self.cache.is_invalid_token(&pool.token1)
965                    {
966                        // Skip pools with invalid tokens as they cannot be properly processed or traded.
967                        continue;
968                    }
969
970                    // Collect tokens needed for RPC fetch
971                    if self.cache.get_token(&pool.token0).is_none() {
972                        token_rpc_buffer.insert(pool.token0);
973                    }
974                    if self.cache.get_token(&pool.token1).is_none() {
975                        token_rpc_buffer.insert(pool.token1);
976                    }
977
978                    // Buffer the pool for later processing
979                    pool_events_buffer.push(pool);
980
981                    // ==== RPC FLUSHING (small batches) ====
982                    if token_rpc_buffer.len() >= token_rpc_batch_size {
983                        let fetched_tokens = self
984                            .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
985                            .await?;
986
987                        // Accumulate for later DB write
988                        token_db_buffer.extend(fetched_tokens);
989                    }
990
991                    // ==== DB FLUSHING (large batches) ====
992                    // Process pools when buffer is full
993                    if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
994                        // 1. Fetch any remaining tokens in RPC buffer (needed for pool construction)
995                        if !token_rpc_buffer.is_empty() {
996                            let fetched_tokens = self
997                                .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
998                                .await?;
999                            token_db_buffer.extend(fetched_tokens);
1000                        }
1001
1002                        // 2. Flush ALL tokens to DB (satisfy foreign key constraints)
1003                        if !token_db_buffer.is_empty() {
1004                            self.cache
1005                                .add_tokens_batch(token_db_buffer.drain(..).collect())
1006                                .await?;
1007                        }
1008
1009                        // 3. Now safe to construct and flush pools
1010                        let pools = self
1011                            .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1012                            .await?;
1013                        self.cache.add_pools_batch(pools).await?;
1014                    }
1015
1016                    metrics.update(blocks_progress as usize);
1017                    // Log progress if needed
1018                    if metrics.should_log_progress(block_number, to_block) {
1019                        metrics.log_progress(block_number);
1020                    }
1021                }
1022
1023                // ==== FINAL FLUSH (all remaining data) ====
1024                // 1. Fetch any remaining tokens
1025                if !token_rpc_buffer.is_empty() {
1026                    let fetched_tokens = self
1027                        .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
1028                        .await?;
1029                    token_db_buffer.extend(fetched_tokens);
1030                }
1031
1032                // 2. Flush all tokens to DB
1033                if !token_db_buffer.is_empty() {
1034                    self.cache
1035                        .add_tokens_batch(token_db_buffer.drain(..).collect())
1036                        .await?;
1037                }
1038
1039                // 3. Process and flush all pools
1040                if !pool_events_buffer.is_empty() {
1041                    let pools = self
1042                        .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1043                        .await?;
1044                    self.cache.add_pools_batch(pools).await?;
1045                }
1046
1047                metrics.log_final_stats();
1048
1049                // Update the last synced block after successful completion.
1050                self.cache
1051                    .update_dex_last_synced_block(&dex.dex.name, to_block)
1052                    .await?;
1053
1054                tracing::info!(
1055                    "Successfully synced DEX {} pools up to block {}",
1056                    dex.dex.name,
1057                    to_block.separate_with_commas()
1058                );
1059
1060                Ok(())
1061            } => result
1062        };
1063
1064        sync_result?;
1065
1066        // Restore default safe settings after sync completion
1067        if let Err(e) = self.cache.toggle_performance_settings(false).await {
1068            tracing::warn!("Failed to restore default settings: {e}");
1069        }
1070
1071        Ok(())
1072    }
1073
1074    /// Fetches token metadata via RPC and updates in-memory cache immediately.
1075    ///
1076    /// This method fetches token information using multicall, updates the in-memory cache right away
1077    /// (so pool construction can proceed), and returns valid tokens for later batch DB writes.
1078    ///
1079    /// # Errors
1080    ///
1081    /// Returns an error if the RPC multicall fails or database operations fail.
1082    async fn fetch_and_cache_tokens_in_memory(
1083        &mut self,
1084        token_buffer: &mut HashSet<Address>,
1085    ) -> anyhow::Result<Vec<Token>> {
1086        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
1087        let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
1088
1089        let mut valid_tokens = Vec::new();
1090
1091        for (token_address, token_info) in token_infos {
1092            match token_info {
1093                Ok(token_info) => {
1094                    let token = Token::new(
1095                        self.chain.clone(),
1096                        token_address,
1097                        token_info.name,
1098                        token_info.symbol,
1099                        token_info.decimals,
1100                    );
1101
1102                    // Update in-memory cache IMMEDIATELY (so construct_pool can read it)
1103                    self.cache.insert_token_in_memory(token.clone());
1104
1105                    // Collect for LATER DB write
1106                    valid_tokens.push(token);
1107                }
1108                Err(token_info_error) => {
1109                    self.cache.insert_invalid_token_in_memory(token_address);
1110                    if let Some(database) = &self.cache.database {
1111                        database
1112                            .add_invalid_token(
1113                                self.chain.chain_id,
1114                                &token_address,
1115                                &token_info_error.to_string(),
1116                            )
1117                            .await?;
1118                    }
1119                }
1120            }
1121        }
1122
1123        Ok(valid_tokens)
1124    }
1125
1126    /// Constructs multiple pools from pool creation events.
1127    ///
1128    /// Assumes all required tokens are already in the in-memory cache.
1129    ///
1130    /// # Errors
1131    ///
1132    /// Logs errors for pools that cannot be constructed (missing tokens),
1133    /// but does not fail the entire batch.
1134    async fn construct_pools_batch(
1135        &mut self,
1136        pool_events: &mut Vec<PoolCreatedEvent>,
1137        dex: &SharedDex,
1138    ) -> anyhow::Result<Vec<Pool>> {
1139        let mut pools = Vec::with_capacity(pool_events.len());
1140
1141        for pool_event in pool_events.drain(..) {
1142            // Both tokens should be in cache now
1143            let token0 = match self.cache.get_token(&pool_event.token0) {
1144                Some(token) => token.clone(),
1145                None => {
1146                    if !self.cache.is_invalid_token(&pool_event.token0) {
1147                        tracing::warn!(
1148                            "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
1149                            pool_event.pool_address,
1150                            pool_event.token0
1151                        );
1152                    }
1153                    continue;
1154                }
1155            };
1156
1157            let token1 = match self.cache.get_token(&pool_event.token1) {
1158                Some(token) => token.clone(),
1159                None => {
1160                    if !self.cache.is_invalid_token(&pool_event.token1) {
1161                        tracing::warn!(
1162                            "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
1163                            pool_event.pool_address,
1164                            pool_event.token1
1165                        );
1166                    }
1167                    continue;
1168                }
1169            };
1170
1171            let pool = Pool::new(
1172                self.chain.clone(),
1173                dex.clone(),
1174                pool_event.pool_address,
1175                pool_event.block_number,
1176                token0,
1177                token1,
1178                pool_event.fee,
1179                pool_event.tick_spacing,
1180                UnixNanos::default(),
1181            );
1182
1183            pools.push(pool);
1184        }
1185
1186        Ok(pools)
1187    }
1188
1189    /// Registers a decentralized exchange for data collection and event monitoring.
1190    ///
1191    /// Registration involves:
1192    /// 1. Adding the DEX to the cache
1193    /// 2. Loading existing pools for the DEX
1194    /// 3. Configuring event signatures for subscriptions
1195    ///
1196    /// # Errors
1197    ///
1198    /// Returns an error if DEX registration, cache operations, or pool loading fails.
1199    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1200        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1201            tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1202
1203            self.cache.add_dex(dex_extended.dex.clone()).await?;
1204            let _ = self.cache.load_pools(&dex_id).await?;
1205
1206            self.subscription_manager.register_dex_for_subscriptions(
1207                dex_id,
1208                dex_extended.swap_created_event.as_ref(),
1209                dex_extended.mint_created_event.as_ref(),
1210                dex_extended.burn_created_event.as_ref(),
1211                dex_extended.collect_created_event.as_ref(),
1212                dex_extended.flash_created_event.as_deref(),
1213            );
1214            Ok(())
1215        } else {
1216            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1217        }
1218    }
1219
1220    /// Bootstraps a [`PoolProfiler`] with the latest state for a given pool.
1221    ///
1222    /// Uses two paths depending on whether the pool has been synced to the database:
1223    /// - **Never synced**: Streams events from HyperSync → restores from on-chain RPC → returns `(profiler, true)`
1224    /// - **Previously synced**: Syncs new events to DB → streams from DB → returns `(profiler, false)`
1225    ///
1226    /// Both paths restore from the latest valid snapshot first (if available), otherwise initialize with pool's initial price.
1227    ///
1228    /// # Returns
1229    ///
1230    /// - `PoolProfiler`: Hydrated profiler with current pool state
1231    /// - `bool`: `true` if constructed from RPC (already valid), `false` if from DB (needs validation)
1232    ///
1233    /// # Errors
1234    ///
1235    /// Returns an error if database is not initialized or event processing fails.
1236    pub async fn bootstrap_latest_pool_profiler(
1237        &mut self,
1238        pool: &SharedPool,
1239    ) -> anyhow::Result<(PoolProfiler, bool)> {
1240        tracing::info!(
1241            "Bootstrapping latest pool profiler for pool {}",
1242            pool.address
1243        );
1244
1245        if self.cache.database.is_none() {
1246            anyhow::bail!(
1247                "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
1248            );
1249        }
1250
1251        let mut profiler = PoolProfiler::new(pool.clone());
1252
1253        // Calculate latest valid block position after which we need to start profiling.
1254        let from_position = match self
1255            .cache
1256            .database
1257            .as_ref()
1258            .unwrap()
1259            .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.address)
1260            .await
1261        {
1262            Ok(Some(snapshot)) => {
1263                tracing::info!(
1264                    "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
1265                    snapshot.block_position.number.separate_with_commas(),
1266                    snapshot.positions.len(),
1267                    snapshot.ticks.len()
1268                );
1269                let block_position = snapshot.block_position.clone();
1270                profiler.restore_from_snapshot(snapshot)?;
1271                tracing::info!("Restored profiler from snapshot");
1272                Some(block_position)
1273            }
1274            _ => {
1275                tracing::info!("No valid snapshot found, processing from beginning");
1276                None
1277            }
1278        };
1279
1280        // If we don't have never synced pool events, proceed with faster
1281        // construction of pool profiler from hypersync and RPC, where we
1282        // dont need syncing of pool events and fetching it from database
1283        if self
1284            .cache
1285            .database
1286            .as_ref()
1287            .unwrap()
1288            .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.address)
1289            .await?
1290            .is_none()
1291        {
1292            return self
1293                .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
1294                .await;
1295        }
1296
1297        // Sync the pool events before bootstrapping of pool profiler
1298        if let Err(e) = self
1299            .sync_pool_events(&pool.dex.name, &pool.address, None, None, false)
1300            .await
1301        {
1302            tracing::error!("Failed to sync pool events for snapshot request: {}", e);
1303        }
1304
1305        if !profiler.is_initialized {
1306            if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
1307                profiler.initialize(initial_sqrt_price_x96);
1308            } else {
1309                anyhow::bail!(
1310                    "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
1311                );
1312            }
1313        }
1314
1315        let from_block = from_position
1316            .as_ref()
1317            .map(|block_position| block_position.number)
1318            .unwrap_or(profiler.pool.creation_block);
1319        let to_block = self.hypersync_client.current_block().await;
1320        let total_blocks = to_block.saturating_sub(from_block) + 1;
1321
1322        // Enable embedded profiler reporting
1323        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1324
1325        let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1326            pool.chain.clone(),
1327            pool.dex.clone(),
1328            pool.instrument_id,
1329            &pool.address,
1330            from_position.clone(),
1331        );
1332
1333        while let Some(result) = stream.next().await {
1334            match result {
1335                Ok(event) => {
1336                    profiler.process(&event)?;
1337                }
1338                Err(e) => log::error!("Error processing event: {}", e),
1339            }
1340        }
1341
1342        profiler.finalize_reporting();
1343
1344        Ok((profiler, false))
1345    }
1346
1347    /// Constructs a pool profiler by fetching events directly from HyperSync RPC.
1348    ///
1349    /// This method is used when the pool has never been synced to the database. It streams
1350    /// liquidity events (mints, burns) directly from HyperSync and processes them
1351    /// to build up the profiler's state in real-time. After processing all events, it
1352    /// restores the profiler from the current on-chain state with the provided ticks and positions
1353    ///
1354    /// # Returns
1355    ///
1356    /// Returns a tuple of:
1357    /// - `PoolProfiler`: The hydrated profiler with state built from events
1358    /// - `bool`: Always `true` to indicate the profiler state was valid, and it was constructed from RPC
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns an error if:
1363    /// - Event streaming from HyperSync fails
1364    /// - Event parsing or processing fails
1365    /// - DEX configuration is invalid
1366    async fn construct_pool_profiler_from_hypersync_rpc(
1367        &self,
1368        mut profiler: PoolProfiler,
1369        from_position: Option<BlockPosition>,
1370    ) -> anyhow::Result<(PoolProfiler, bool)> {
1371        tracing::info!(
1372            "Constructing pool profiler from hypersync stream and RPC final state querying"
1373        );
1374        let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1375        let mint_event_signature = dex_extended.mint_created_event.as_ref();
1376        let burn_event_signature = dex_extended.burn_created_event.as_ref();
1377        let initialize_event_signature =
1378            if let Some(initialize_event) = &dex_extended.initialize_event {
1379                initialize_event.as_ref()
1380            } else {
1381                anyhow::bail!(
1382                    "DEX {} does not have initialize event set.",
1383                    &profiler.pool.dex.name
1384                );
1385            };
1386        let mint_sig_bytes = hex::decode(
1387            mint_event_signature
1388                .strip_prefix("0x")
1389                .unwrap_or(mint_event_signature),
1390        )?;
1391        let burn_sig_bytes = hex::decode(
1392            burn_event_signature
1393                .strip_prefix("0x")
1394                .unwrap_or(burn_event_signature),
1395        )?;
1396        let initialize_sig_bytes = hex::decode(
1397            initialize_event_signature
1398                .strip_prefix("0x")
1399                .unwrap_or(initialize_event_signature),
1400        )?;
1401
1402        let from_block = from_position
1403            .map(|block_position| block_position.number)
1404            .unwrap_or(profiler.pool.creation_block);
1405        let to_block = self.hypersync_client.current_block().await;
1406        let total_blocks = to_block.saturating_sub(from_block) + 1;
1407
1408        tracing::info!(
1409            "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1410            profiler.pool.address,
1411            from_block.separate_with_commas(),
1412            to_block.separate_with_commas(),
1413            total_blocks.separate_with_commas()
1414        );
1415
1416        // Enable embedded profiler reporting
1417        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1418
1419        let pool_events_stream = self
1420            .hypersync_client
1421            .request_contract_events_stream(
1422                from_block,
1423                None,
1424                &profiler.pool.address,
1425                vec![
1426                    mint_event_signature,
1427                    burn_event_signature,
1428                    initialize_event_signature,
1429                ],
1430            )
1431            .await;
1432        tokio::pin!(pool_events_stream);
1433
1434        while let Some(log) = pool_events_stream.next().await {
1435            let event_sig_bytes = extract_event_signature_bytes(&log)?;
1436
1437            if event_sig_bytes == initialize_sig_bytes {
1438                let initialize_event = dex_extended.parse_initialize_event(log)?;
1439                profiler.initialize(initialize_event.sqrt_price_x96);
1440                self.cache
1441                    .database
1442                    .as_ref()
1443                    .unwrap()
1444                    .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1445                    .await?;
1446            } else if event_sig_bytes == mint_sig_bytes {
1447                let mint_event = dex_extended.parse_mint_event(log)?;
1448                match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1449                    Ok(liquidity_update) => {
1450                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1451                    }
1452                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
1453                }
1454            } else if event_sig_bytes == burn_sig_bytes {
1455                let burn_event = dex_extended.parse_burn_event(log)?;
1456                match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1457                    Ok(liquidity_update) => {
1458                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1459                    }
1460                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
1461                }
1462            } else {
1463                let event_signature = hex::encode(event_sig_bytes);
1464                tracing::error!(
1465                    "Unexpected event signature in bootstrap_latest_pool_profiler: {} for log {:?}",
1466                    event_signature,
1467                    log
1468                );
1469            }
1470        }
1471
1472        profiler.finalize_reporting();
1473
1474        // Hydrate from the current RPC state
1475        match self.get_on_chain_snapshot(&profiler).await {
1476            Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1477            Err(e) => tracing::error!(
1478                "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1479            ),
1480        }
1481
1482        Ok((profiler, true))
1483    }
1484
1485    /// Validates a pool profiler's state against on-chain data for accuracy verification.
1486    ///
1487    /// This method performs integrity checking by comparing the profiler's internal state
1488    /// (positions, ticks, liquidity) with the actual on-chain smart contract state. For UniswapV3
1489    /// pools, it fetches current on-chain data and verifies that the profiler's tracked state matches.
1490    /// If validation succeeds or is bypassed, the snapshot is marked as valid in the database.
1491    ///
1492    /// # Errors
1493    ///
1494    /// Returns an error if database operations fail when marking the snapshot as valid.
1495    pub async fn check_snapshot_validity(
1496        &self,
1497        profiler: &PoolProfiler,
1498        already_validated: bool,
1499    ) -> anyhow::Result<bool> {
1500        // Determine validity and get block position for marking
1501        let (is_valid, block_position) = if already_validated {
1502            // Skip RPC call - profiler was validated during construction from RPC
1503            tracing::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1504            let last_event = profiler
1505                .last_processed_event
1506                .clone()
1507                .expect("Profiler should have last_processed_event");
1508            (true, last_event)
1509        } else {
1510            // Fetch on-chain state and compare
1511            match self.get_on_chain_snapshot(profiler).await {
1512                Ok(on_chain_snapshot) => {
1513                    tracing::info!("Comparing profiler state with on-chain state...");
1514                    let valid = compare_pool_profiler(&profiler, &on_chain_snapshot);
1515                    if !valid {
1516                        tracing::error!(
1517                            "Pool profiler state does NOT match on-chain smart contract state"
1518                        );
1519                    }
1520                    (valid, on_chain_snapshot.block_position)
1521                }
1522                Err(e) => {
1523                    tracing::error!("Failed to check snapshot validity: {e}");
1524                    return Ok(false);
1525                }
1526            }
1527        };
1528
1529        // Mark snapshot as valid in database if validation passed
1530        if is_valid {
1531            if let Some(cache_database) = &self.cache.database {
1532                cache_database
1533                    .mark_pool_snapshot_valid(
1534                        profiler.pool.chain.chain_id,
1535                        &profiler.pool.address,
1536                        block_position.number,
1537                        block_position.transaction_index,
1538                        block_position.log_index,
1539                    )
1540                    .await?;
1541                tracing::info!("Marked pool profiler snapshot as valid");
1542            }
1543        }
1544
1545        Ok(is_valid)
1546    }
1547
1548    /// Fetches current on-chain pool state at the last processed block.
1549    ///
1550    /// Queries the pool smart contract to retrieve active tick liquidity and position data,
1551    /// using the profiler's active positions and last processed block number.
1552    /// Used for profiler state restoration after bootstrapping and validation.
1553    async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1554        if profiler.pool.dex.name == DexType::UniswapV3 {
1555            let last_processed_event = profiler
1556                .last_processed_event
1557                .clone()
1558                .expect("We expect at least one processed event in the pool");
1559            let on_chain_snapshot = self
1560                .univ3_pool
1561                .fetch_snapshot(
1562                    &profiler.pool.address,
1563                    profiler.pool.instrument_id,
1564                    profiler.get_active_tick_values().as_slice(),
1565                    &profiler.get_all_position_keys(),
1566                    last_processed_event,
1567                )
1568                .await?;
1569
1570            Ok(on_chain_snapshot)
1571        } else {
1572            anyhow::bail!(
1573                "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1574                profiler.pool.dex.name
1575            )
1576        }
1577    }
1578
1579    /// Replays historical events for a pool to hydrate its profiler state.
1580    ///
1581    /// Streams all historical swap, liquidity, and fee collect events from the database
1582    /// and sends them through the normal data event pipeline to build up pool profiler state.
1583    ///
1584    /// # Errors
1585    ///
1586    /// Returns an error if database streaming fails or event processing fails.
1587    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1588        if let Some(database) = &self.cache.database {
1589            tracing::info!(
1590                "Replaying historical events for pool {} to hydrate profiler",
1591                pool.instrument_id
1592            );
1593
1594            let mut event_stream = database.stream_pool_events(
1595                self.chain.clone(),
1596                dex.clone(),
1597                pool.instrument_id,
1598                &pool.address,
1599                None,
1600            );
1601            let mut event_count = 0;
1602
1603            while let Some(event_result) = event_stream.next().await {
1604                match event_result {
1605                    Ok(event) => {
1606                        let data_event = match event {
1607                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1608                            DexPoolData::LiquidityUpdate(update) => {
1609                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1610                            }
1611                            DexPoolData::FeeCollect(collect) => {
1612                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1613                            }
1614                            DexPoolData::Flash(flash) => {
1615                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1616                            }
1617                        };
1618                        self.send_data(data_event);
1619                        event_count += 1;
1620                    }
1621                    Err(e) => {
1622                        tracing::error!(
1623                            "Error streaming event for pool {}: {e}",
1624                            pool.instrument_id
1625                        );
1626                    }
1627                }
1628            }
1629
1630            tracing::info!(
1631                "Replayed {event_count} historical events for pool {}",
1632                pool.instrument_id
1633            );
1634        } else {
1635            tracing::debug!(
1636                "No database available, skipping event replay for pool {}",
1637                pool.instrument_id
1638            );
1639        }
1640
1641        Ok(())
1642    }
1643
1644    /// Determines the starting block for syncing operations.
1645    fn determine_from_block(&self) -> u64 {
1646        self.config
1647            .from_block
1648            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1649    }
1650
1651    /// Retrieves extended DEX information for a registered DEX.
1652    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1653        if !self.cache.get_registered_dexes().contains(dex_id) {
1654            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1655        }
1656
1657        match get_dex_extended(self.chain.name, dex_id) {
1658            Some(dex) => Ok(dex),
1659            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1660        }
1661    }
1662
1663    /// Retrieves a pool from the cache by its address.
1664    ///
1665    /// # Errors
1666    ///
1667    /// Returns an error if the pool is not registered in the cache.
1668    pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1669        match self.cache.get_pool(pool_address) {
1670            Some(pool) => Ok(pool),
1671            None => anyhow::bail!("Pool {pool_address} is not registered"),
1672        }
1673    }
1674
1675    /// Sends a data event to all subscribers through the data channel.
1676    pub fn send_data(&self, data: DataEvent) {
1677        if let Some(data_tx) = &self.data_tx {
1678            tracing::debug!("Sending {data}");
1679
1680            if let Err(e) = data_tx.send(data) {
1681                tracing::error!("Failed to send data: {e}");
1682            }
1683        } else {
1684            tracing::error!("No data event channel for sending data");
1685        }
1686    }
1687
1688    /// Disconnects all active connections and cleanup resources.
1689    ///
1690    /// This method should be called when shutting down the client to ensure
1691    /// proper cleanup of network connections and background tasks.
1692    pub async fn disconnect(&mut self) {
1693        self.hypersync_client.disconnect().await;
1694    }
1695}