nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23    Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
24    SharedPool, Token,
25    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash},
26};
27
28use crate::{
29    cache::BlockchainCache,
30    config::BlockchainDataClientConfig,
31    contracts::erc20::{Erc20Contract, TokenInfoError},
32    data::subscription::DefiDataSubscriptionManager,
33    events::{
34        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent,
35        pool_created::PoolCreatedEvent, swap::SwapEvent,
36    },
37    exchanges::{extended::DexExtended, get_dex_extended},
38    hypersync::{
39        client::HyperSyncClient,
40        helpers::{extract_block_number, extract_event_signature_bytes},
41    },
42    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
43    rpc::{
44        BlockchainRpcClient, BlockchainRpcClientAny,
45        chains::{
46            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47            polygon::PolygonRpcClient,
48        },
49        http::BlockchainHttpRpcClient,
50        types::BlockchainMessage,
51    },
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
57///
58/// This struct encapsulates the core functionality for interacting with blockchain networks,
59/// including syncing historical data, processing real-time events, and managing cached entities.
60#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62    /// The blockchain being targeted by this client instance.
63    pub chain: SharedChain,
64    /// The configuration for the data client.
65    pub config: BlockchainDataClientConfig,
66    /// Local cache for blockchain entities.
67    pub cache: BlockchainCache,
68    /// Interface for interacting with ERC20 token contracts.
69    tokens: Erc20Contract,
70    /// Client for the HyperSync data indexing service.
71    pub hypersync_client: HyperSyncClient,
72    /// Optional WebSocket RPC client for direct blockchain node communication.
73    pub rpc_client: Option<BlockchainRpcClientAny>,
74    /// Manages subscriptions for various DEX events (swaps, mints, burns).
75    pub subscription_manager: DefiDataSubscriptionManager,
76    /// Channel sender for data events.
77    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
78}
79
80impl BlockchainDataClientCore {
81    /// Creates a new instance of [`BlockchainDataClientCore`].
82    ///
83    /// # Panics
84    ///
85    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
86    #[must_use]
87    pub fn new(
88        config: BlockchainDataClientConfig,
89        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
90        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
91    ) -> Self {
92        let chain = config.chain.clone();
93        let cache = BlockchainCache::new(chain.clone());
94        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
95            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
96            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
97        } else {
98            None
99        };
100        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101            config.http_rpc_url.clone(),
102            config.rpc_requests_per_second,
103        ));
104        let erc20_contract = Erc20Contract::new(
105            http_rpc_client,
106            config.pool_filters.remove_pools_with_empty_erc20fields,
107        );
108
109        let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
110        Self {
111            chain,
112            config,
113            rpc_client,
114            tokens: erc20_contract,
115            cache,
116            hypersync_client,
117            subscription_manager: DefiDataSubscriptionManager::new(),
118            data_tx,
119        }
120    }
121
122    /// Initializes the database connection for the blockchain cache.
123    pub async fn initialize_cache_database(&mut self) {
124        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
125            tracing::info!(
126                "Initializing blockchain cache on database '{}'",
127                pg_connect_options.database
128            );
129            self.cache
130                .initialize_database(pg_connect_options.clone().into())
131                .await;
132        }
133    }
134
135    /// Creates an appropriate blockchain RPC client for the specified blockchain.
136    fn initialize_rpc_client(
137        blockchain: Blockchain,
138        wss_rpc_url: String,
139    ) -> BlockchainRpcClientAny {
140        match blockchain {
141            Blockchain::Ethereum => {
142                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
143            }
144            Blockchain::Polygon => {
145                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
146            }
147            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
148            Blockchain::Arbitrum => {
149                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
150            }
151            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
152        }
153    }
154
155    /// Establishes connections to all configured data sources and initializes the cache.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if cache initialization or connection setup fails.
160    pub async fn connect(&mut self) -> anyhow::Result<()> {
161        tracing::info!(
162            "Connecting blockchain data client for '{}'",
163            self.chain.name
164        );
165        self.initialize_cache_database().await;
166
167        if let Some(ref mut rpc_client) = self.rpc_client {
168            rpc_client.connect().await?;
169        }
170
171        let from_block = self.determine_from_block();
172
173        tracing::info!(
174            "Connecting to blockchain data source for '{chain_name}' from block {from_block}",
175            chain_name = self.chain.name
176        );
177
178        // Initialize the chain and register the Dex exchanges in the cache.
179        self.cache.initialize_chain().await;
180        // Import the cached blockchain data.
181        self.cache.connect(from_block).await?;
182        // TODO disable block syncing for now as we don't have timestamps yet configured
183        // Sync the remaining blocks which are missing.
184        // self.sync_blocks(Some(from_block), None).await?;
185        for dex in self.config.dex_ids.clone() {
186            self.register_dex_exchange(dex).await?;
187            self.sync_exchange_pools(&dex, from_block, None, false)
188                .await?;
189        }
190
191        Ok(())
192    }
193
194    /// Syncs blocks with consistency checks to ensure data integrity.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if block syncing fails or if consistency checks fail.
199    pub async fn sync_blocks_checked(
200        &mut self,
201        from_block: u64,
202        to_block: Option<u64>,
203    ) -> anyhow::Result<()> {
204        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
205            // If blocks are consistent proceed with copy command.
206            if blocks_status.is_consistent() {
207                tracing::info!(
208                    "Cache is consistent: no gaps detected (last continuous block: {})",
209                    blocks_status.last_continuous_block
210                );
211                let target_block = max(blocks_status.max_block + 1, from_block);
212                tracing::info!("Starting fast sync with COPY from block {}", target_block);
213                self.sync_blocks(target_block, to_block, true).await?;
214            } else {
215                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
216                tracing::info!(
217                    "Cache inconsistency detected: {} blocks missing between {} and {}",
218                    gap_size,
219                    blocks_status.last_continuous_block + 1,
220                    blocks_status.max_block
221                );
222
223                tracing::info!(
224                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
225                    blocks_status.last_continuous_block + 1,
226                    blocks_status.max_block
227                );
228                self.sync_blocks(
229                    blocks_status.last_continuous_block + 1,
230                    Some(blocks_status.max_block),
231                    false,
232                )
233                .await?;
234
235                tracing::info!(
236                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
237                    blocks_status.max_block + 1
238                );
239                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
240                    .await?;
241            }
242        } else {
243            self.sync_blocks(from_block, to_block, true).await?;
244        }
245
246        Ok(())
247    }
248
249    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if block fetching, caching, or database operations fail.
254    pub async fn sync_blocks(
255        &mut self,
256        from_block: u64,
257        to_block: Option<u64>,
258        use_copy_command: bool,
259    ) -> anyhow::Result<()> {
260        let to_block = if let Some(block) = to_block {
261            block
262        } else {
263            self.hypersync_client.current_block().await
264        };
265        let total_blocks = to_block.saturating_sub(from_block) + 1;
266        tracing::info!(
267            "Syncing blocks from {from_block} to {to_block} (total: {total_blocks} blocks)"
268        );
269
270        // Enable performance settings for sync operations
271        if let Err(e) = self.cache.toggle_performance_settings(true).await {
272            tracing::warn!("Failed to enable performance settings: {e}");
273        }
274
275        let blocks_stream = self
276            .hypersync_client
277            .request_blocks_stream(from_block, Some(to_block))
278            .await;
279
280        tokio::pin!(blocks_stream);
281
282        let mut metrics = BlockchainSyncReporter::new(
283            BlockchainSyncReportItems::Blocks,
284            from_block,
285            total_blocks,
286            BLOCKS_PROCESS_IN_SYNC_REPORT,
287        );
288
289        // Batch configuration
290        const BATCH_SIZE: usize = 1000;
291        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
292
293        while let Some(block) = blocks_stream.next().await {
294            let block_number = block.number;
295            if self.cache.get_block_timestamp(block_number).is_some() {
296                continue;
297            }
298            batch.push(block);
299
300            // Process batch when full or last block
301            if batch.len() >= BATCH_SIZE || block_number >= to_block {
302                let batch_size = batch.len();
303
304                self.cache.add_blocks_batch(batch, use_copy_command).await?;
305                metrics.update(batch_size);
306
307                // Re-initialize batch vector
308                batch = Vec::with_capacity(BATCH_SIZE);
309            }
310
311            // Log progress if needed
312            if metrics.should_log_progress(block_number, to_block) {
313                metrics.log_progress(block_number);
314            }
315        }
316
317        // Process any remaining blocks
318        if !batch.is_empty() {
319            let batch_size = batch.len();
320            self.cache.add_blocks_batch(batch, use_copy_command).await?;
321            metrics.update(batch_size);
322        }
323
324        metrics.log_final_stats();
325
326        // Restore default safe settings after sync completion
327        if let Err(e) = self.cache.toggle_performance_settings(false).await {
328            tracing::warn!("Failed to restore default settings: {e}");
329        }
330
331        Ok(())
332    }
333
334    /// Synchronizes all events for a specific pool within the given block range.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if event syncing, parsing, or database operations fail.
339    pub async fn sync_pool_events(
340        &mut self,
341        dex: &DexType,
342        pool_address: Address,
343        from_block: Option<u64>,
344        to_block: Option<u64>,
345        reset: bool,
346    ) -> anyhow::Result<()> {
347        let pool: SharedPool = self.get_pool(&pool_address)?.clone();
348        let pool_display = pool.to_full_spec_string();
349        let from_block = from_block.unwrap_or(pool.creation_block);
350
351        let (last_synced_block, effective_from_block) = if reset {
352            (None, from_block)
353        } else {
354            let last_synced_block = self
355                .cache
356                .get_pool_last_synced_block(dex, &pool_address)
357                .await?;
358            let effective_from_block = last_synced_block
359                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
360            (last_synced_block, effective_from_block)
361        };
362
363        let to_block = match to_block {
364            Some(block) => block,
365            None => self.hypersync_client.current_block().await,
366        };
367
368        // Skip sync if we're already up to date
369        if effective_from_block > to_block {
370            tracing::info!(
371                "D {} already synced to block {} (current: {}), skipping sync",
372                dex,
373                last_synced_block.unwrap_or(0),
374                to_block
375            );
376            return Ok(());
377        }
378
379        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
380        let last_block_across_pool_events_table = self
381            .cache
382            .get_pool_event_tables_last_block(&pool_address)
383            .await?;
384
385        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
386        tracing::info!(
387            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
388            pool_display,
389            effective_from_block,
390            to_block,
391            total_blocks,
392            if let Some(last_synced) = last_synced_block {
393                format!(" - resuming from last synced block {}", last_synced)
394            } else {
395                String::new()
396            }
397        );
398
399        let mut metrics = BlockchainSyncReporter::new(
400            BlockchainSyncReportItems::PoolEvents,
401            effective_from_block,
402            total_blocks,
403            BLOCKS_PROCESS_IN_SYNC_REPORT,
404        );
405        let dex_extended = self.get_dex_extended(dex)?.clone();
406        let swap_event_signature = dex_extended.swap_created_event.as_ref();
407        let mint_event_signature = dex_extended.mint_created_event.as_ref();
408        let burn_event_signature = dex_extended.burn_created_event.as_ref();
409        let collect_event_signature = dex_extended.collect_created_event.as_ref();
410        let flash_event_signature = dex_extended.flash_created_event.as_ref();
411        let initialize_event_signature: Option<&str> =
412            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
413
414        // Pre-decode event signatures to bytes for efficient comparison
415        let swap_sig_bytes = hex::decode(
416            swap_event_signature
417                .strip_prefix("0x")
418                .unwrap_or(swap_event_signature),
419        )?;
420        let mint_sig_bytes = hex::decode(
421            mint_event_signature
422                .strip_prefix("0x")
423                .unwrap_or(mint_event_signature),
424        )?;
425        let burn_sig_bytes = hex::decode(
426            burn_event_signature
427                .strip_prefix("0x")
428                .unwrap_or(burn_event_signature),
429        )?;
430        let collect_sig_bytes = hex::decode(
431            collect_event_signature
432                .strip_prefix("0x")
433                .unwrap_or(collect_event_signature),
434        )?;
435        let flash_sig_bytes = flash_event_signature
436            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
437        let initialize_sig_bytes = initialize_event_signature
438            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
439
440        let mut event_signatures = vec![
441            swap_event_signature,
442            mint_event_signature,
443            burn_event_signature,
444            collect_event_signature,
445        ];
446        if let Some(event) = dex_extended.initialize_event.as_ref() {
447            event_signatures.push(event);
448        }
449        if let Some(event) = dex_extended.flash_created_event.as_ref() {
450            event_signatures.push(event);
451        }
452        let pool_events_stream = self
453            .hypersync_client
454            .request_contract_events_stream(
455                effective_from_block,
456                Some(to_block),
457                &pool_address,
458                event_signatures,
459            )
460            .await;
461        tokio::pin!(pool_events_stream);
462
463        let mut last_block_saved = effective_from_block;
464        let mut blocks_processed = 0;
465
466        // Batch configuration for events
467        const EVENT_BATCH_SIZE: usize = 20000;
468        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
469        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
470        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
471        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
472
473        // Track when we've moved beyond stale data and can use COPY
474        let mut beyond_stale_data = last_block_across_pool_events_table
475            .map_or(true, |tables_max| effective_from_block > tables_max);
476        while let Some(log) = pool_events_stream.next().await {
477            let block_number = extract_block_number(&log)?;
478            blocks_processed += block_number - last_block_saved;
479            last_block_saved = block_number;
480
481            let event_sig_bytes = extract_event_signature_bytes(&log)?;
482            if event_sig_bytes == swap_sig_bytes.as_slice() {
483                let swap_event = dex_extended.parse_swap_event(log)?;
484                match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
485                    Ok(swap) => swap_batch.push(swap),
486                    Err(e) => tracing::error!("Failed to process swap event: {e}"),
487                }
488            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
489                let mint_event = dex_extended.parse_mint_event(log)?;
490                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
491                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
492                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
493                }
494            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
495                let burn_event = dex_extended.parse_burn_event(log)?;
496                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
497                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
498                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
499                }
500            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
501                let collect_event = dex_extended.parse_collect_event(log)?;
502                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
503                    Ok(fee_collect) => collect_batch.push(fee_collect),
504                    Err(e) => tracing::error!("Failed to process collect event: {e}"),
505                }
506            } else if let Some(flash_sig_bytes_inner) = &flash_sig_bytes {
507                if event_sig_bytes == flash_sig_bytes_inner.as_slice() {
508                    if let Some(parse_fn) = dex_extended.parse_flash_event_fn {
509                        match parse_fn(dex_extended.dex.clone(), log) {
510                            Ok(flash_event) => {
511                                match self.process_pool_flash_event(&flash_event, &pool) {
512                                    Ok(flash) => flash_batch.push(flash),
513                                    Err(e) => tracing::error!("Failed to process flash event: {e}"),
514                                }
515                            }
516                            Err(e) => tracing::error!("Failed to parse flash event: {e}"),
517                        }
518                    }
519                }
520            } else if let Some(init_sig_bytes) = &initialize_sig_bytes {
521                if event_sig_bytes == init_sig_bytes.as_slice() {
522                    let initialize_event = dex_extended.parse_initialize_event(log)?;
523                    self.cache
524                        .update_pool_initialize_price_tick(&initialize_event)
525                        .await?;
526                }
527            } else {
528                let event_signature = hex::encode(event_sig_bytes);
529                tracing::error!(
530                    "Unexpected event signature: {} for log {:?}",
531                    event_signature,
532                    log
533                );
534            }
535
536            // Check if we've moved beyond stale data (transition point for strategy change)
537            if !beyond_stale_data
538                && last_block_across_pool_events_table
539                    .map_or(false, |table_max| block_number > table_max)
540            {
541                tracing::info!(
542                    "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
543                    block_number
544                );
545
546                // Flush all batches with ON CONFLICT to handle any remaining duplicates
547                self.flush_event_batches(
548                    EVENT_BATCH_SIZE,
549                    &mut swap_batch,
550                    &mut liquidity_batch,
551                    &mut collect_batch,
552                    &mut flash_batch,
553                    false,
554                    true,
555                )
556                .await?;
557
558                beyond_stale_data = true;
559                tracing::info!("Switched to COPY mode - future batches will use COPY command");
560            } else {
561                // Process batches when they reach batch size
562                self.flush_event_batches(
563                    EVENT_BATCH_SIZE,
564                    &mut swap_batch,
565                    &mut liquidity_batch,
566                    &mut collect_batch,
567                    &mut flash_batch,
568                    false, // TODO temporary dont use copy command
569                    false,
570                )
571                .await?;
572            }
573
574            metrics.update(blocks_processed as usize);
575            blocks_processed = 0;
576
577            // Log progress if needed
578            if metrics.should_log_progress(block_number, to_block) {
579                metrics.log_progress(block_number);
580                self.cache
581                    .update_pool_last_synced_block(dex, &pool_address, block_number)
582                    .await?;
583            }
584        }
585
586        self.flush_event_batches(
587            EVENT_BATCH_SIZE,
588            &mut swap_batch,
589            &mut liquidity_batch,
590            &mut collect_batch,
591            &mut flash_batch,
592            false,
593            true,
594        )
595        .await?;
596
597        metrics.log_final_stats();
598        self.cache
599            .update_pool_last_synced_block(dex, &pool_address, to_block)
600            .await?;
601
602        tracing::info!(
603            "Successfully synced Dex '{}' Pool '{}' up to block {}",
604            dex,
605            pool_display,
606            to_block
607        );
608        Ok(())
609    }
610
611    async fn flush_event_batches(
612        &mut self,
613        event_batch_size: usize,
614        swap_batch: &mut Vec<PoolSwap>,
615        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
616        collect_batch: &mut Vec<PoolFeeCollect>,
617        flash_batch: &mut Vec<PoolFlash>,
618        use_copy_command: bool,
619        force_flush_all: bool,
620    ) -> anyhow::Result<()> {
621        if force_flush_all || swap_batch.len() >= event_batch_size {
622            if !swap_batch.is_empty() {
623                self.cache
624                    .add_pool_swaps_batch(swap_batch, use_copy_command)
625                    .await?;
626                swap_batch.clear();
627            }
628        }
629        if force_flush_all || liquidity_batch.len() >= event_batch_size {
630            if !liquidity_batch.is_empty() {
631                self.cache
632                    .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
633                    .await?;
634                liquidity_batch.clear();
635            }
636        }
637        if force_flush_all || collect_batch.len() >= event_batch_size {
638            if !collect_batch.is_empty() {
639                self.cache
640                    .add_pool_fee_collects_batch(collect_batch, use_copy_command)
641                    .await?;
642                collect_batch.clear();
643            }
644        }
645        if force_flush_all || flash_batch.len() >= event_batch_size {
646            if !flash_batch.is_empty() {
647                self.cache.add_pool_flash_batch(flash_batch).await?;
648                flash_batch.clear();
649            }
650        }
651        Ok(())
652    }
653
654    /// Returns an error if swap event processing fails.
655    ///
656    /// # Panics
657    ///
658    /// Panics if swap event conversion to trade data fails.
659    pub fn process_pool_swap_event(
660        &self,
661        swap_event: &SwapEvent,
662        pool: &SharedPool,
663        dex_extended: &DexExtended,
664    ) -> anyhow::Result<PoolSwap> {
665        let timestamp = self
666            .cache
667            .get_block_timestamp(swap_event.block_number)
668            .copied();
669        let (side, size, price) =
670            dex_extended.convert_to_trade_data(&pool.token0, &pool.token1, swap_event)?;
671        let swap = swap_event.to_pool_swap(
672            self.chain.clone(),
673            pool.instrument_id,
674            pool.address,
675            Some(side),
676            Some(size),
677            Some(price),
678            timestamp,
679        );
680
681        // TODO add caching and persisting of swaps, resolve block timestamps sync
682        // self.cache.add_pool_swap(&swap).await?;
683
684        Ok(swap)
685    }
686
687    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
692    pub fn process_pool_mint_event(
693        &self,
694        mint_event: &MintEvent,
695        pool: &SharedPool,
696        dex_extended: &DexExtended,
697    ) -> anyhow::Result<PoolLiquidityUpdate> {
698        let timestamp = self
699            .cache
700            .get_block_timestamp(mint_event.block_number)
701            .copied();
702
703        let liquidity_update = mint_event.to_pool_liquidity_update(
704            self.chain.clone(),
705            dex_extended.dex.clone(),
706            pool.instrument_id,
707            pool.address,
708            timestamp,
709        );
710
711        // self.cache.add_liquidity_update(&liquidity_update).await?;
712
713        Ok(liquidity_update)
714    }
715
716    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
717    /// Processes a pool burn event and converts it to a liquidity update.
718    ///
719    /// # Errors
720    ///
721    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
722    pub fn process_pool_burn_event(
723        &self,
724        burn_event: &BurnEvent,
725        pool: &SharedPool,
726        dex_extended: &DexExtended,
727    ) -> anyhow::Result<PoolLiquidityUpdate> {
728        let timestamp = self
729            .cache
730            .get_block_timestamp(burn_event.block_number)
731            .copied();
732
733        let liquidity_update = burn_event.to_pool_liquidity_update(
734            self.chain.clone(),
735            dex_extended.dex.clone(),
736            pool.instrument_id,
737            pool.address,
738            timestamp,
739        );
740
741        // self.cache.add_liquidity_update(&liquidity_update).await?;
742
743        Ok(liquidity_update)
744    }
745
746    /// Processes a pool collect event and converts it to a fee collection.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
751    pub fn process_pool_collect_event(
752        &self,
753        collect_event: &CollectEvent,
754        pool: &SharedPool,
755        dex_extended: &DexExtended,
756    ) -> anyhow::Result<PoolFeeCollect> {
757        let timestamp = self
758            .cache
759            .get_block_timestamp(collect_event.block_number)
760            .copied();
761
762        let fee_collect = collect_event.to_pool_fee_collect(
763            self.chain.clone(),
764            dex_extended.dex.clone(),
765            pool.instrument_id,
766            pool.address,
767            timestamp,
768        );
769
770        Ok(fee_collect)
771    }
772
773    /// Processes a pool flash event and converts it to a flash loan.
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
778    pub fn process_pool_flash_event(
779        &self,
780        flash_event: &FlashEvent,
781        pool: &SharedPool,
782    ) -> anyhow::Result<PoolFlash> {
783        let timestamp = self
784            .cache
785            .get_block_timestamp(flash_event.block_number)
786            .copied();
787
788        let flash = flash_event.to_pool_flash(
789            self.chain.clone(),
790            pool.instrument_id,
791            pool.address,
792            timestamp,
793        );
794
795        Ok(flash)
796    }
797
798    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
799    ///
800    /// This method performs a comprehensive sync of:
801    /// 1. Pool creation events from the DEX factory
802    /// 2. Token metadata for all tokens in discovered pools
803    /// 3. Pool entities with proper token associations
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if syncing pools, tokens, or DEX operations fail.
808    pub async fn sync_exchange_pools(
809        &mut self,
810        dex: &DexType,
811        from_block: u64,
812        to_block: Option<u64>,
813        reset: bool,
814    ) -> anyhow::Result<()> {
815        // Check for last synced block and use it as starting point if higher (unless reset is true)
816        let (last_synced_block, effective_from_block) = if reset {
817            (None, from_block)
818        } else {
819            let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
820            let effective_from_block = last_synced_block
821                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
822            (last_synced_block, effective_from_block)
823        };
824
825        let to_block = match to_block {
826            Some(block) => block,
827            None => self.hypersync_client.current_block().await,
828        };
829
830        // Skip sync if we're already up to date
831        if effective_from_block > to_block {
832            tracing::info!(
833                "DEX {} already synced to block {} (current: {}), skipping sync",
834                dex,
835                last_synced_block.unwrap_or(0),
836                to_block
837            );
838            return Ok(());
839        }
840
841        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
842        tracing::info!(
843            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
844            effective_from_block,
845            to_block,
846            total_blocks,
847            if let Some(last_synced) = last_synced_block {
848                format!(" - resuming from last synced block {last_synced}")
849            } else {
850                String::new()
851            }
852        );
853
854        let mut metrics = BlockchainSyncReporter::new(
855            BlockchainSyncReportItems::PoolCreatedEvents,
856            effective_from_block,
857            total_blocks,
858            BLOCKS_PROCESS_IN_SYNC_REPORT,
859        );
860
861        let dex = self.get_dex_extended(dex)?.clone();
862        let factory_address = &dex.factory;
863        let pair_created_event_signature = dex.pool_created_event.as_ref();
864        let pools_stream = self
865            .hypersync_client
866            .request_contract_events_stream(
867                effective_from_block,
868                Some(to_block),
869                factory_address,
870                vec![pair_created_event_signature],
871            )
872            .await;
873
874        tokio::pin!(pools_stream);
875
876        // Get the token batch by diving max multicall calls by 3, as each token will yield 3 calls (name, decimals, symbol).
877        let token_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
878        const POOL_BATCH_SIZE: usize = 1000;
879        let mut token_buffer: HashSet<Address> = HashSet::new();
880        let mut pool_buffer: Vec<PoolCreatedEvent> = Vec::new();
881        let mut last_block_saved = effective_from_block;
882
883        while let Some(log) = pools_stream.next().await {
884            let block_number = extract_block_number(&log)?;
885            let blocks_progress = block_number - last_block_saved;
886            last_block_saved = block_number;
887
888            let pool = dex.parse_pool_created_event(log)?;
889            if self.cache.get_pool(&pool.pool_address).is_some() {
890                // Pool is already initialized and cached.
891                continue;
892            }
893
894            if self.cache.is_invalid_token(&pool.token0)
895                || self.cache.is_invalid_token(&pool.token1)
896            {
897                // Skip pools with invalid tokens as they cannot be properly processed or traded.
898                continue;
899            }
900
901            if self.cache.get_token(&pool.token0).is_none() {
902                token_buffer.insert(pool.token0);
903            }
904            if self.cache.get_token(&pool.token1).is_none() {
905                token_buffer.insert(pool.token1);
906            }
907            // Buffer the pool for later processing
908            pool_buffer.push(pool);
909
910            if token_buffer.len() >= token_batch_size || pool_buffer.len() >= POOL_BATCH_SIZE {
911                self.flush_tokens_and_process_pools(
912                    &mut token_buffer,
913                    &mut pool_buffer,
914                    dex.dex.clone(),
915                )
916                .await?;
917            }
918
919            metrics.update(blocks_progress as usize);
920            // Log progress if needed
921            if metrics.should_log_progress(block_number, to_block) {
922                metrics.log_progress(block_number);
923            }
924        }
925
926        if !token_buffer.is_empty() || !pool_buffer.is_empty() {
927            self.flush_tokens_and_process_pools(
928                &mut token_buffer,
929                &mut pool_buffer,
930                dex.dex.clone(),
931            )
932            .await?;
933        }
934
935        metrics.log_final_stats();
936
937        // Update the last synced block after successful completion.
938        self.cache
939            .update_dex_last_synced_block(&dex.dex.name, to_block)
940            .await?;
941
942        tracing::info!(
943            "Successfully synced DEX {} pools up to block {}",
944            dex.dex.name,
945            to_block
946        );
947
948        Ok(())
949    }
950
951    /// Processes buffered tokens and their associated pools in batch.
952    ///
953    /// This helper method:
954    /// 1. Fetches token metadata for all buffered token addresses
955    /// 2. Caches valid tokens and tracks invalid ones
956    /// 3. Processes pools, skipping those with invalid tokens
957    async fn flush_tokens_and_process_pools(
958        &mut self,
959        token_buffer: &mut HashSet<Address>,
960        pool_buffer: &mut Vec<PoolCreatedEvent>,
961        dex: SharedDex,
962    ) -> anyhow::Result<()> {
963        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
964        let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
965
966        let mut empty_tokens = HashSet::new();
967        // We cache both the multicall failed and decoding errors here to skip the pools.
968        let mut decoding_errors_tokens = HashSet::new();
969
970        for (token_address, token_info) in token_infos {
971            match token_info {
972                Ok(token) => {
973                    let token = Token::new(
974                        self.chain.clone(),
975                        token_address,
976                        token.name,
977                        token.symbol,
978                        token.decimals,
979                    );
980                    self.cache.add_token(token).await?;
981                }
982                Err(token_info_error) => match token_info_error {
983                    TokenInfoError::EmptyTokenField { .. } => {
984                        empty_tokens.insert(token_address);
985                        self.cache
986                            .add_invalid_token(token_address, &token_info_error.to_string())
987                            .await?;
988                    }
989                    TokenInfoError::DecodingError { .. } => {
990                        decoding_errors_tokens.insert(token_address);
991                        self.cache
992                            .add_invalid_token(token_address, &token_info_error.to_string())
993                            .await?;
994                    }
995                    TokenInfoError::CallFailed { .. } => {
996                        decoding_errors_tokens.insert(token_address);
997                        self.cache
998                            .add_invalid_token(token_address, &token_info_error.to_string())
999                            .await?;
1000                    }
1001                    _ => {
1002                        tracing::error!(
1003                            "Error fetching token info: {}",
1004                            token_info_error.to_string()
1005                        );
1006                    }
1007                },
1008            }
1009        }
1010        let mut pools = Vec::new();
1011        for pool_event in &mut *pool_buffer {
1012            // We skip the pool that contains one of the tokens that is flagged as empty or decoding error.
1013            if empty_tokens.contains(&pool_event.token0)
1014                || empty_tokens.contains(&pool_event.token1)
1015                || decoding_errors_tokens.contains(&pool_event.token0)
1016                || decoding_errors_tokens.contains(&pool_event.token1)
1017            {
1018                continue;
1019            }
1020
1021            match self.construct_pool(dex.clone(), pool_event).await {
1022                Ok(pool) => pools.push(pool),
1023                Err(e) => tracing::error!(
1024                    "Failed to process {} with error {}",
1025                    pool_event.pool_address,
1026                    e
1027                ),
1028            }
1029        }
1030
1031        self.cache.add_pools_batch(pools).await?;
1032        pool_buffer.clear();
1033
1034        Ok(())
1035    }
1036
1037    /// Constructs a new `Pool` entity from a pool creation event with full token validation.
1038    ///
1039    /// Validates that both tokens are present in the cache and creates a properly
1040    /// initialized pool entity with all required metadata including DEX, tokens, fees, and block information.
1041    ///
1042    /// # Errors
1043    ///
1044    /// Returns an error if either token is not found in the cache, indicating incomplete token synchronization.
1045    async fn construct_pool(
1046        &mut self,
1047        dex: SharedDex,
1048        event: &PoolCreatedEvent,
1049    ) -> anyhow::Result<Pool> {
1050        let token0 = match self.cache.get_token(&event.token0) {
1051            Some(token) => token.clone(),
1052            None => {
1053                anyhow::bail!("Token {} should be initialized in the cache", event.token0);
1054            }
1055        };
1056        let token1 = match self.cache.get_token(&event.token1) {
1057            Some(token) => token.clone(),
1058            None => {
1059                anyhow::bail!("Token {} should be initialized in the cache", event.token1);
1060            }
1061        };
1062
1063        Ok(Pool::new(
1064            self.chain.clone(),
1065            dex,
1066            event.pool_address,
1067            event.block_number,
1068            token0,
1069            token1,
1070            event.fee,
1071            event.tick_spacing,
1072            UnixNanos::default(), // TODO: Use default timestamp for now
1073        ))
1074    }
1075
1076    /// Registers a decentralized exchange for data collection and event monitoring.
1077    ///
1078    /// Registration involves:
1079    /// 1. Adding the DEX to the cache
1080    /// 2. Loading existing pools for the DEX
1081    /// 3. Configuring event signatures for subscriptions
1082    ///
1083    /// # Errors
1084    ///
1085    /// Returns an error if DEX registration, cache operations, or pool loading fails.
1086    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1087        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1088            tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1089
1090            self.cache.add_dex(dex_extended.dex.clone()).await?;
1091            let _ = self.cache.load_pools(&dex_id).await?;
1092
1093            self.subscription_manager.register_dex_for_subscriptions(
1094                dex_id,
1095                dex_extended.swap_created_event.as_ref(),
1096                dex_extended.mint_created_event.as_ref(),
1097                dex_extended.burn_created_event.as_ref(),
1098                dex_extended.collect_created_event.as_ref(),
1099                dex_extended.flash_created_event.as_deref(),
1100            );
1101            Ok(())
1102        } else {
1103            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1104        }
1105    }
1106
1107    /// Replays historical events for a pool to hydrate its profiler state.
1108    ///
1109    /// Streams all historical swap, liquidity, and fee collect events from the database
1110    /// and sends them through the normal data event pipeline to build up pool profiler state.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if database streaming fails or event processing fails.
1115    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1116        if let Some(database) = &self.cache.database {
1117            tracing::info!(
1118                "Replaying historical events for pool {} to hydrate profiler",
1119                pool.instrument_id
1120            );
1121
1122            let mut event_stream = database.stream_pool_events(
1123                self.chain.clone(),
1124                dex.clone(),
1125                pool.instrument_id,
1126                &pool.address,
1127                None,
1128            );
1129            let mut event_count = 0;
1130
1131            while let Some(event_result) = event_stream.next().await {
1132                match event_result {
1133                    Ok(event) => {
1134                        let data_event = match event {
1135                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1136                            DexPoolData::LiquidityUpdate(update) => {
1137                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1138                            }
1139                            DexPoolData::FeeCollect(collect) => {
1140                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1141                            }
1142                            DexPoolData::Flash(flash) => {
1143                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1144                            }
1145                        };
1146                        self.send_data(data_event);
1147                        event_count += 1;
1148                    }
1149                    Err(e) => {
1150                        tracing::error!(
1151                            "Error streaming event for pool {}: {e}",
1152                            pool.instrument_id
1153                        );
1154                    }
1155                }
1156            }
1157
1158            tracing::info!(
1159                "Replayed {event_count} historical events for pool {}",
1160                pool.instrument_id
1161            );
1162        } else {
1163            tracing::debug!(
1164                "No database available, skipping event replay for pool {}",
1165                pool.instrument_id
1166            );
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Determines the starting block for syncing operations.
1173    fn determine_from_block(&self) -> u64 {
1174        self.config
1175            .from_block
1176            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1177    }
1178
1179    /// Retrieves extended DEX information for a registered DEX.
1180    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1181        if !self.cache.get_registered_dexes().contains(dex_id) {
1182            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1183        }
1184
1185        match get_dex_extended(self.chain.name, dex_id) {
1186            Some(dex) => Ok(dex),
1187            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1188        }
1189    }
1190
1191    /// Retrieves a pool from the cache by its address.
1192    ///
1193    /// # Errors
1194    ///
1195    /// Returns an error if the pool is not registered in the cache.
1196    pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1197        match self.cache.get_pool(pool_address) {
1198            Some(pool) => Ok(pool),
1199            None => anyhow::bail!("Pool {pool_address} is not registered"),
1200        }
1201    }
1202
1203    /// Sends a data event to all subscribers through the data channel.
1204    pub fn send_data(&self, data: DataEvent) {
1205        if let Some(data_tx) = &self.data_tx {
1206            tracing::debug!("Sending {data}");
1207
1208            if let Err(e) = data_tx.send(data) {
1209                tracing::error!("Failed to send data: {e}");
1210            }
1211        } else {
1212            tracing::error!("No data event channel for sending data");
1213        }
1214    }
1215
1216    /// Disconnects all active connections and cleanup resources.
1217    ///
1218    /// This method should be called when shutting down the client to ensure
1219    /// proper cleanup of network connections and background tasks.
1220    pub fn disconnect(&mut self) {
1221        self.hypersync_client.disconnect();
1222    }
1223}