nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::{Address, U256};
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23    Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
24    SharedPool, Token, data::PoolFeeCollect,
25};
26
27use crate::{
28    cache::BlockchainCache,
29    config::BlockchainDataClientConfig,
30    contracts::erc20::{Erc20Contract, TokenInfoError},
31    data::subscription::DefiDataSubscriptionManager,
32    decode::u256_to_quantity,
33    events::{
34        burn::BurnEvent, collect::CollectEvent, mint::MintEvent, pool_created::PoolCreatedEvent,
35        swap::SwapEvent,
36    },
37    exchanges::{extended::DexExtended, get_dex_extended},
38    hypersync::{
39        client::HyperSyncClient,
40        helpers::{extract_block_number, extract_event_signature_bytes},
41    },
42    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
43    rpc::{
44        BlockchainRpcClient, BlockchainRpcClientAny,
45        chains::{
46            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47            polygon::PolygonRpcClient,
48        },
49        http::BlockchainHttpRpcClient,
50        types::BlockchainMessage,
51    },
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50000;
55
56/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
57///
58/// This struct encapsulates the core functionality for interacting with blockchain networks,
59/// including syncing historical data, processing real-time events, and managing cached entities.
60#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62    /// The blockchain being targeted by this client instance.
63    pub chain: SharedChain,
64    /// The configuration for the data client.
65    pub config: BlockchainDataClientConfig,
66    /// Local cache for blockchain entities.
67    pub cache: BlockchainCache,
68    /// Interface for interacting with ERC20 token contracts.
69    tokens: Erc20Contract,
70    /// Client for the HyperSync data indexing service.
71    pub hypersync_client: HyperSyncClient,
72    /// Optional WebSocket RPC client for direct blockchain node communication.
73    pub rpc_client: Option<BlockchainRpcClientAny>,
74    /// Manages subscriptions for various DEX events (swaps, mints, burns).
75    pub subscription_manager: DefiDataSubscriptionManager,
76    /// Channel sender for data events.
77    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
78}
79
80impl BlockchainDataClientCore {
81    /// Creates a new instance of [`BlockchainDataClientCore`].
82    ///
83    /// # Panics
84    ///
85    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
86    #[must_use]
87    pub fn new(
88        config: BlockchainDataClientConfig,
89        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
90        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
91    ) -> Self {
92        let chain = config.chain.clone();
93        let cache = BlockchainCache::new(chain.clone());
94        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
95            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
96            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
97        } else {
98            None
99        };
100        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101            config.http_rpc_url.clone(),
102            config.rpc_requests_per_second,
103        ));
104        let erc20_contract = Erc20Contract::new(
105            http_rpc_client,
106            config.pool_filters.remove_pools_with_empty_erc20fields,
107        );
108
109        let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
110        Self {
111            chain,
112            config,
113            rpc_client,
114            tokens: erc20_contract,
115            cache,
116            hypersync_client,
117            subscription_manager: DefiDataSubscriptionManager::new(),
118            data_tx,
119        }
120    }
121
122    /// Initializes the database connection for the blockchain cache.
123    pub async fn initialize_cache_database(&mut self) {
124        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
125            tracing::info!(
126                "Initializing blockchain cache on database '{}'",
127                pg_connect_options.database
128            );
129            self.cache
130                .initialize_database(pg_connect_options.clone().into())
131                .await;
132        }
133    }
134
135    /// Creates an appropriate blockchain RPC client for the specified blockchain.
136    fn initialize_rpc_client(
137        blockchain: Blockchain,
138        wss_rpc_url: String,
139    ) -> BlockchainRpcClientAny {
140        match blockchain {
141            Blockchain::Ethereum => {
142                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
143            }
144            Blockchain::Polygon => {
145                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
146            }
147            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
148            Blockchain::Arbitrum => {
149                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
150            }
151            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
152        }
153    }
154
155    /// Establishes connections to all configured data sources and initializes the cache.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if cache initialization or connection setup fails.
160    pub async fn connect(&mut self) -> anyhow::Result<()> {
161        tracing::info!(
162            "Connecting blockchain data client for '{}'",
163            self.chain.name
164        );
165        self.initialize_cache_database().await;
166
167        if let Some(ref mut rpc_client) = self.rpc_client {
168            rpc_client.connect().await?;
169        }
170
171        let from_block = self.determine_from_block();
172
173        tracing::info!(
174            "Connecting to blockchain data source for '{chain_name}' from block {from_block}",
175            chain_name = self.chain.name
176        );
177
178        // Initialize the chain and register the Dex exchanges in the cache.
179        self.cache.initialize_chain().await;
180        // Import the cached blockchain data.
181        self.cache.connect(from_block).await?;
182        // TODO disable block syncing for now as we don't have timestamps yet configured
183        // Sync the remaining blocks which are missing.
184        // self.sync_blocks(Some(from_block), None).await?;
185        for dex in self.config.dex_ids.clone() {
186            self.register_dex_exchange(dex).await?;
187            self.sync_exchange_pools(&dex, from_block, None, false)
188                .await?;
189        }
190
191        Ok(())
192    }
193
194    /// Syncs blocks with consistency checks to ensure data integrity.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if block syncing fails or if consistency checks fail.
199    pub async fn sync_blocks_checked(
200        &mut self,
201        from_block: u64,
202        to_block: Option<u64>,
203    ) -> anyhow::Result<()> {
204        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
205            // If blocks are consistent proceed with copy command.
206            if blocks_status.is_consistent() {
207                tracing::info!(
208                    "Cache is consistent: no gaps detected (last continuous block: {})",
209                    blocks_status.last_continuous_block
210                );
211                let target_block = max(blocks_status.max_block + 1, from_block);
212                tracing::info!("Starting fast sync with COPY from block {}", target_block);
213                self.sync_blocks(target_block, to_block, true).await?;
214            } else {
215                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
216                tracing::info!(
217                    "Cache inconsistency detected: {} blocks missing between {} and {}",
218                    gap_size,
219                    blocks_status.last_continuous_block + 1,
220                    blocks_status.max_block
221                );
222
223                tracing::info!(
224                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
225                    blocks_status.last_continuous_block + 1,
226                    blocks_status.max_block
227                );
228                self.sync_blocks(
229                    blocks_status.last_continuous_block + 1,
230                    Some(blocks_status.max_block),
231                    false,
232                )
233                .await?;
234
235                tracing::info!(
236                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
237                    blocks_status.max_block + 1
238                );
239                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
240                    .await?;
241            }
242        } else {
243            self.sync_blocks(from_block, to_block, true).await?;
244        }
245
246        Ok(())
247    }
248
249    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if block fetching, caching, or database operations fail.
254    pub async fn sync_blocks(
255        &mut self,
256        from_block: u64,
257        to_block: Option<u64>,
258        use_copy_command: bool,
259    ) -> anyhow::Result<()> {
260        let to_block = if let Some(block) = to_block {
261            block
262        } else {
263            self.hypersync_client.current_block().await
264        };
265        let total_blocks = to_block.saturating_sub(from_block) + 1;
266        tracing::info!(
267            "Syncing blocks from {from_block} to {to_block} (total: {total_blocks} blocks)"
268        );
269
270        // Enable performance settings for sync operations
271        if let Err(e) = self.cache.toggle_performance_settings(true).await {
272            tracing::warn!("Failed to enable performance settings: {e}");
273        }
274
275        let blocks_stream = self
276            .hypersync_client
277            .request_blocks_stream(from_block, Some(to_block))
278            .await;
279
280        tokio::pin!(blocks_stream);
281
282        let mut metrics = BlockchainSyncReporter::new(
283            BlockchainSyncReportItems::Blocks,
284            from_block,
285            total_blocks,
286            BLOCKS_PROCESS_IN_SYNC_REPORT,
287        );
288
289        // Batch configuration
290        const BATCH_SIZE: usize = 1000;
291        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
292
293        while let Some(block) = blocks_stream.next().await {
294            let block_number = block.number;
295            if self.cache.get_block_timestamp(block_number).is_some() {
296                continue;
297            }
298            batch.push(block);
299
300            // Process batch when full or last block
301            if batch.len() >= BATCH_SIZE || block_number >= to_block {
302                let batch_size = batch.len();
303
304                self.cache.add_blocks_batch(batch, use_copy_command).await?;
305                metrics.update(batch_size);
306
307                // Re-initialize batch vector
308                batch = Vec::with_capacity(BATCH_SIZE);
309            }
310
311            // Log progress if needed
312            if metrics.should_log_progress(block_number, to_block) {
313                metrics.log_progress(block_number);
314            }
315        }
316
317        // Process any remaining blocks
318        if !batch.is_empty() {
319            let batch_size = batch.len();
320            self.cache.add_blocks_batch(batch, use_copy_command).await?;
321            metrics.update(batch_size);
322        }
323
324        metrics.log_final_stats();
325
326        // Restore default safe settings after sync completion
327        if let Err(e) = self.cache.toggle_performance_settings(false).await {
328            tracing::warn!("Failed to restore default settings: {e}");
329        }
330
331        Ok(())
332    }
333
334    /// Synchronizes all events for a specific pool within the given block range.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if event syncing, parsing, or database operations fail.
339    pub async fn sync_pool_events(
340        &mut self,
341        dex: &DexType,
342        pool_address: Address,
343        from_block: Option<u64>,
344        to_block: Option<u64>,
345        reset: bool,
346    ) -> anyhow::Result<()> {
347        let pool: SharedPool = self.get_pool(&pool_address)?.clone();
348        let pool_display = pool.to_full_spec_string();
349        let from_block = from_block.unwrap_or(pool.creation_block);
350
351        let (last_synced_block, effective_from_block) = if reset {
352            (None, from_block)
353        } else {
354            let last_synced_block = self
355                .cache
356                .get_pool_last_synced_block(dex, &pool_address)
357                .await?;
358            let effective_from_block = last_synced_block
359                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
360            (last_synced_block, effective_from_block)
361        };
362
363        let to_block = match to_block {
364            Some(block) => block,
365            None => self.hypersync_client.current_block().await,
366        };
367
368        // Skip sync if we're already up to date
369        if effective_from_block > to_block {
370            tracing::info!(
371                "D {} already synced to block {} (current: {}), skipping sync",
372                dex,
373                last_synced_block.unwrap_or(0),
374                to_block
375            );
376            return Ok(());
377        }
378
379        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
380        let last_block_across_pool_events_table = self
381            .cache
382            .get_pool_event_tables_last_block(&pool_address)
383            .await?;
384
385        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
386        tracing::info!(
387            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
388            pool_display,
389            effective_from_block,
390            to_block,
391            total_blocks,
392            if let Some(last_synced) = last_synced_block {
393                format!(" - resuming from last synced block {}", last_synced)
394            } else {
395                String::new()
396            }
397        );
398
399        let mut metrics = BlockchainSyncReporter::new(
400            BlockchainSyncReportItems::PoolEvents,
401            effective_from_block,
402            total_blocks,
403            BLOCKS_PROCESS_IN_SYNC_REPORT,
404        );
405        let dex_extended = self.get_dex_extended(dex)?.clone();
406        let swap_event_signature = dex_extended.swap_created_event.as_ref();
407        let mint_event_signature = dex_extended.mint_created_event.as_ref();
408        let burn_event_signature = dex_extended.burn_created_event.as_ref();
409        let collect_event_signature = dex_extended.collect_created_event.as_ref();
410        let initialize_event_signature: Option<&str> =
411            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
412
413        // Pre-decode event signatures to bytes for efficient comparison
414        let swap_sig_bytes = hex::decode(
415            swap_event_signature
416                .strip_prefix("0x")
417                .unwrap_or(swap_event_signature),
418        )?;
419        let mint_sig_bytes = hex::decode(
420            mint_event_signature
421                .strip_prefix("0x")
422                .unwrap_or(mint_event_signature),
423        )?;
424        let burn_sig_bytes = hex::decode(
425            burn_event_signature
426                .strip_prefix("0x")
427                .unwrap_or(burn_event_signature),
428        )?;
429        let collect_sig_bytes = hex::decode(
430            collect_event_signature
431                .strip_prefix("0x")
432                .unwrap_or(collect_event_signature),
433        )?;
434        let initialize_sig_bytes = initialize_event_signature
435            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
436
437        let mut event_signatures = vec![
438            swap_event_signature,
439            mint_event_signature,
440            burn_event_signature,
441            collect_event_signature,
442        ];
443        if let Some(event) = dex_extended.initialize_event.as_ref() {
444            event_signatures.push(event);
445        }
446        let pool_events_stream = self
447            .hypersync_client
448            .request_contract_events_stream(
449                effective_from_block,
450                Some(to_block),
451                &pool_address,
452                event_signatures,
453            )
454            .await;
455        tokio::pin!(pool_events_stream);
456
457        let mut last_block_saved = effective_from_block;
458        let mut blocks_processed = 0;
459
460        // Batch configuration for events
461        const EVENT_BATCH_SIZE: usize = 20000;
462        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
463        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
464        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
465
466        // Track when we've moved beyond stale data and can use COPY
467        let mut beyond_stale_data = last_block_across_pool_events_table
468            .map_or(true, |tables_max| effective_from_block > tables_max);
469        while let Some(log) = pool_events_stream.next().await {
470            let block_number = extract_block_number(&log)?;
471            blocks_processed += block_number - last_block_saved;
472            last_block_saved = block_number;
473
474            let event_sig_bytes = extract_event_signature_bytes(&log)?;
475            if event_sig_bytes == swap_sig_bytes.as_slice() {
476                let swap_event = dex_extended.parse_swap_event(log)?;
477                match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
478                    Ok(swap) => swap_batch.push(swap),
479                    Err(e) => tracing::error!("Failed to process swap event: {e}"),
480                }
481            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
482                let mint_event = dex_extended.parse_mint_event(log)?;
483                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
484                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
485                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
486                }
487            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
488                let burn_event = dex_extended.parse_burn_event(log)?;
489                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
490                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
491                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
492                }
493            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
494                let collect_event = dex_extended.parse_collect_event(log)?;
495                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
496                    Ok(fee_collect) => collect_batch.push(fee_collect),
497                    Err(e) => tracing::error!("Failed to process collect event: {e}"),
498                }
499            } else if let Some(init_sig_bytes) = &initialize_sig_bytes {
500                if event_sig_bytes == init_sig_bytes.as_slice() {
501                    let initialize_event = dex_extended.parse_initialize_event(log)?;
502                    self.cache
503                        .update_pool_initialize_price_tick(&initialize_event)
504                        .await?;
505                }
506            } else {
507                let event_signature = hex::encode(event_sig_bytes);
508                tracing::error!(
509                    "Unexpected event signature: {} for log {:?}",
510                    event_signature,
511                    log
512                );
513            }
514
515            // Check if we've moved beyond stale data (transition point for strategy change)
516            if !beyond_stale_data
517                && last_block_across_pool_events_table
518                    .map_or(false, |table_max| block_number > table_max)
519            {
520                tracing::info!(
521                    "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
522                    block_number
523                );
524
525                // Flush all batches with ON CONFLICT to handle any remaining duplicates
526                self.flush_event_batches(
527                    EVENT_BATCH_SIZE,
528                    &mut swap_batch,
529                    &mut liquidity_batch,
530                    &mut collect_batch,
531                    false,
532                    true,
533                )
534                .await?;
535
536                beyond_stale_data = true;
537                tracing::info!("Switched to COPY mode - future batches will use COPY command");
538            } else {
539                // Process batches when they reach batch size
540                self.flush_event_batches(
541                    EVENT_BATCH_SIZE,
542                    &mut swap_batch,
543                    &mut liquidity_batch,
544                    &mut collect_batch,
545                    beyond_stale_data,
546                    false,
547                )
548                .await?;
549            }
550
551            metrics.update(blocks_processed as usize);
552            blocks_processed = 0;
553
554            // Log progress if needed
555            if metrics.should_log_progress(block_number, to_block) {
556                metrics.log_progress(block_number);
557                self.cache
558                    .update_pool_last_synced_block(dex, &pool_address, block_number)
559                    .await?;
560            }
561        }
562
563        self.flush_event_batches(
564            EVENT_BATCH_SIZE,
565            &mut swap_batch,
566            &mut liquidity_batch,
567            &mut collect_batch,
568            beyond_stale_data,
569            true,
570        )
571        .await?;
572
573        metrics.log_final_stats();
574        self.cache
575            .update_pool_last_synced_block(dex, &pool_address, to_block)
576            .await?;
577
578        tracing::info!(
579            "Successfully synced Dex '{}' Pool '{}' up to block {}",
580            dex,
581            pool_display,
582            to_block
583        );
584        Ok(())
585    }
586
587    async fn flush_event_batches(
588        &mut self,
589        event_batch_size: usize,
590        swap_batch: &mut Vec<PoolSwap>,
591        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
592        collect_batch: &mut Vec<PoolFeeCollect>,
593        use_copy_command: bool,
594        force_flush_all: bool,
595    ) -> anyhow::Result<()> {
596        if force_flush_all || swap_batch.len() >= event_batch_size {
597            if !swap_batch.is_empty() {
598                self.cache
599                    .add_pool_swaps_batch(swap_batch, use_copy_command)
600                    .await?;
601                swap_batch.clear();
602            }
603        }
604        if force_flush_all || liquidity_batch.len() >= event_batch_size {
605            if !liquidity_batch.is_empty() {
606                self.cache
607                    .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
608                    .await?;
609                liquidity_batch.clear();
610            }
611        }
612        if force_flush_all || collect_batch.len() >= event_batch_size {
613            if !collect_batch.is_empty() {
614                self.cache
615                    .add_pool_fee_collects_batch(collect_batch, use_copy_command)
616                    .await?;
617                collect_batch.clear();
618            }
619        }
620        Ok(())
621    }
622
623    /// Returns an error if swap event processing fails.
624    ///
625    /// # Panics
626    ///
627    /// Panics if swap event conversion to trade data fails.
628    pub fn process_pool_swap_event(
629        &self,
630        swap_event: &SwapEvent,
631        pool: &SharedPool,
632        dex_extended: &DexExtended,
633    ) -> anyhow::Result<PoolSwap> {
634        let timestamp = self
635            .cache
636            .get_block_timestamp(swap_event.block_number)
637            .copied();
638
639        let (side, size, price) = dex_extended
640            .convert_to_trade_data(&pool.token0, &pool.token1, swap_event)
641            .expect("Failed to convert swap event to trade data");
642        let swap = swap_event.to_pool_swap(
643            self.chain.clone(),
644            pool.instrument_id,
645            pool.address,
646            side,
647            size,
648            price,
649            timestamp,
650        );
651
652        // TODO add caching and persisting of swaps, resolve block timestamps sync
653        // self.cache.add_pool_swap(&swap).await?;
654
655        Ok(swap)
656    }
657
658    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
663    pub fn process_pool_mint_event(
664        &self,
665        mint_event: &MintEvent,
666        pool: &SharedPool,
667        dex_extended: &DexExtended,
668    ) -> anyhow::Result<PoolLiquidityUpdate> {
669        let timestamp = self
670            .cache
671            .get_block_timestamp(mint_event.block_number)
672            .copied();
673        let liquidity = u256_to_quantity(
674            U256::from(mint_event.amount),
675            self.chain.native_currency_decimals,
676        )?;
677        let amount0 = u256_to_quantity(mint_event.amount0, pool.token0.decimals)?;
678        let amount1 = u256_to_quantity(mint_event.amount1, pool.token1.decimals)?;
679
680        let liquidity_update = mint_event.to_pool_liquidity_update(
681            self.chain.clone(),
682            dex_extended.dex.clone(),
683            pool.instrument_id,
684            pool.address,
685            liquidity,
686            amount0,
687            amount1,
688            timestamp,
689        );
690
691        // self.cache.add_liquidity_update(&liquidity_update).await?;
692
693        Ok(liquidity_update)
694    }
695
696    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
697    /// Processes a pool burn event and converts it to a liquidity update.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
702    pub fn process_pool_burn_event(
703        &self,
704        burn_event: &BurnEvent,
705        pool: &SharedPool,
706        dex_extended: &DexExtended,
707    ) -> anyhow::Result<PoolLiquidityUpdate> {
708        let timestamp = self
709            .cache
710            .get_block_timestamp(burn_event.block_number)
711            .copied();
712        let liquidity = u256_to_quantity(
713            U256::from(burn_event.amount),
714            self.chain.native_currency_decimals,
715        )?;
716        let amount0 = u256_to_quantity(burn_event.amount0, pool.token0.decimals)?;
717        let amount1 = u256_to_quantity(burn_event.amount1, pool.token1.decimals)?;
718
719        let liquidity_update = burn_event.to_pool_liquidity_update(
720            self.chain.clone(),
721            dex_extended.dex.clone(),
722            pool.instrument_id,
723            pool.address,
724            liquidity,
725            amount0,
726            amount1,
727            timestamp,
728        );
729
730        // self.cache.add_liquidity_update(&liquidity_update).await?;
731
732        Ok(liquidity_update)
733    }
734
735    /// Processes a pool collect event and converts it to a fee collection.
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
740    pub fn process_pool_collect_event(
741        &self,
742        collect_event: &CollectEvent,
743        pool: &SharedPool,
744        dex_extended: &DexExtended,
745    ) -> anyhow::Result<PoolFeeCollect> {
746        let timestamp = self
747            .cache
748            .get_block_timestamp(collect_event.block_number)
749            .copied();
750        let fee0 = u256_to_quantity(collect_event.amount0, pool.token0.decimals)?;
751        let fee1 = u256_to_quantity(collect_event.amount1, pool.token1.decimals)?;
752
753        let fee_collect = collect_event.to_pool_fee_collect(
754            self.chain.clone(),
755            dex_extended.dex.clone(),
756            pool.instrument_id,
757            pool.address,
758            fee0,
759            fee1,
760            timestamp,
761        );
762
763        Ok(fee_collect)
764    }
765
766    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
767    ///
768    /// This method performs a comprehensive sync of:
769    /// 1. Pool creation events from the DEX factory
770    /// 2. Token metadata for all tokens in discovered pools
771    /// 3. Pool entities with proper token associations
772    ///
773    /// # Errors
774    ///
775    /// Returns an error if syncing pools, tokens, or DEX operations fail.
776    pub async fn sync_exchange_pools(
777        &mut self,
778        dex: &DexType,
779        from_block: u64,
780        to_block: Option<u64>,
781        reset: bool,
782    ) -> anyhow::Result<()> {
783        // Check for last synced block and use it as starting point if higher (unless reset is true)
784        let (last_synced_block, effective_from_block) = if reset {
785            (None, from_block)
786        } else {
787            let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
788            let effective_from_block = last_synced_block
789                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
790            (last_synced_block, effective_from_block)
791        };
792
793        let to_block = match to_block {
794            Some(block) => block,
795            None => self.hypersync_client.current_block().await,
796        };
797
798        // Skip sync if we're already up to date
799        if effective_from_block > to_block {
800            tracing::info!(
801                "DEX {} already synced to block {} (current: {}), skipping sync",
802                dex,
803                last_synced_block.unwrap_or(0),
804                to_block
805            );
806            return Ok(());
807        }
808
809        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
810        tracing::info!(
811            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
812            effective_from_block,
813            to_block,
814            total_blocks,
815            if let Some(last_synced) = last_synced_block {
816                format!(" - resuming from last synced block {last_synced}")
817            } else {
818                String::new()
819            }
820        );
821
822        let mut metrics = BlockchainSyncReporter::new(
823            BlockchainSyncReportItems::PoolCreatedEvents,
824            effective_from_block,
825            total_blocks,
826            BLOCKS_PROCESS_IN_SYNC_REPORT,
827        );
828
829        let dex = self.get_dex_extended(dex)?.clone();
830        let factory_address = &dex.factory;
831        let pair_created_event_signature = dex.pool_created_event.as_ref();
832        let pools_stream = self
833            .hypersync_client
834            .request_contract_events_stream(
835                effective_from_block,
836                Some(to_block),
837                factory_address,
838                vec![pair_created_event_signature],
839            )
840            .await;
841
842        tokio::pin!(pools_stream);
843
844        // Get the token batch by diving max multicall calls by 3, as each token will yield 3 calls (name, decimals, symbol).
845        let token_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
846        const POOL_BATCH_SIZE: usize = 1000;
847        let mut token_buffer: HashSet<Address> = HashSet::new();
848        let mut pool_buffer: Vec<PoolCreatedEvent> = Vec::new();
849        let mut last_block_saved = effective_from_block;
850
851        while let Some(log) = pools_stream.next().await {
852            let block_number = extract_block_number(&log)?;
853            let blocks_progress = block_number - last_block_saved;
854            last_block_saved = block_number;
855
856            let pool = dex.parse_pool_created_event(log)?;
857            if self.cache.get_pool(&pool.pool_address).is_some() {
858                // Pool is already initialized and cached.
859                continue;
860            }
861
862            if self.cache.is_invalid_token(&pool.token0)
863                || self.cache.is_invalid_token(&pool.token1)
864            {
865                // Skip pools with invalid tokens as they cannot be properly processed or traded.
866                continue;
867            }
868
869            if self.cache.get_token(&pool.token0).is_none() {
870                token_buffer.insert(pool.token0);
871            }
872            if self.cache.get_token(&pool.token1).is_none() {
873                token_buffer.insert(pool.token1);
874            }
875            // Buffer the pool for later processing
876            pool_buffer.push(pool);
877
878            if token_buffer.len() >= token_batch_size || pool_buffer.len() >= POOL_BATCH_SIZE {
879                self.flush_tokens_and_process_pools(
880                    &mut token_buffer,
881                    &mut pool_buffer,
882                    dex.dex.clone(),
883                )
884                .await?;
885            }
886
887            metrics.update(blocks_progress as usize);
888            // Log progress if needed
889            if metrics.should_log_progress(block_number, to_block) {
890                metrics.log_progress(block_number);
891            }
892        }
893
894        if !token_buffer.is_empty() || !pool_buffer.is_empty() {
895            self.flush_tokens_and_process_pools(
896                &mut token_buffer,
897                &mut pool_buffer,
898                dex.dex.clone(),
899            )
900            .await?;
901        }
902
903        metrics.log_final_stats();
904
905        // Update the last synced block after successful completion.
906        self.cache
907            .update_dex_last_synced_block(&dex.dex.name, to_block)
908            .await?;
909
910        tracing::info!(
911            "Successfully synced DEX {} pools up to block {}",
912            dex.dex.name,
913            to_block
914        );
915
916        Ok(())
917    }
918
919    /// Processes buffered tokens and their associated pools in batch.
920    ///
921    /// This helper method:
922    /// 1. Fetches token metadata for all buffered token addresses
923    /// 2. Caches valid tokens and tracks invalid ones
924    /// 3. Processes pools, skipping those with invalid tokens
925    async fn flush_tokens_and_process_pools(
926        &mut self,
927        token_buffer: &mut HashSet<Address>,
928        pool_buffer: &mut Vec<PoolCreatedEvent>,
929        dex: SharedDex,
930    ) -> anyhow::Result<()> {
931        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
932        let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
933
934        let mut empty_tokens = HashSet::new();
935        // We cache both the multicall failed and decoding errors here to skip the pools.
936        let mut decoding_errors_tokens = HashSet::new();
937
938        for (token_address, token_info) in token_infos {
939            match token_info {
940                Ok(token) => {
941                    let token = Token::new(
942                        self.chain.clone(),
943                        token_address,
944                        token.name,
945                        token.symbol,
946                        token.decimals,
947                    );
948                    self.cache.add_token(token).await?;
949                }
950                Err(token_info_error) => match token_info_error {
951                    TokenInfoError::EmptyTokenField { .. } => {
952                        empty_tokens.insert(token_address);
953                        self.cache
954                            .add_invalid_token(token_address, &token_info_error.to_string())
955                            .await?;
956                    }
957                    TokenInfoError::DecodingError { .. } => {
958                        decoding_errors_tokens.insert(token_address);
959                        self.cache
960                            .add_invalid_token(token_address, &token_info_error.to_string())
961                            .await?;
962                    }
963                    TokenInfoError::CallFailed { .. } => {
964                        decoding_errors_tokens.insert(token_address);
965                        self.cache
966                            .add_invalid_token(token_address, &token_info_error.to_string())
967                            .await?;
968                    }
969                    _ => {
970                        tracing::error!(
971                            "Error fetching token info: {}",
972                            token_info_error.to_string()
973                        );
974                    }
975                },
976            }
977        }
978        let mut pools = Vec::new();
979        for pool_event in &mut *pool_buffer {
980            // We skip the pool that contains one of the tokens that is flagged as empty or decoding error.
981            if empty_tokens.contains(&pool_event.token0)
982                || empty_tokens.contains(&pool_event.token1)
983                || decoding_errors_tokens.contains(&pool_event.token0)
984                || decoding_errors_tokens.contains(&pool_event.token1)
985            {
986                continue;
987            }
988
989            match self.construct_pool(dex.clone(), pool_event).await {
990                Ok(pool) => pools.push(pool),
991                Err(e) => tracing::error!(
992                    "Failed to process {} with error {}",
993                    pool_event.pool_address,
994                    e
995                ),
996            }
997        }
998
999        self.cache.add_pools_batch(pools).await?;
1000        pool_buffer.clear();
1001        Ok(())
1002    }
1003
1004    /// Constructs a new `Pool` entity from a pool creation event with full token validation.
1005    ///
1006    /// Validates that both tokens are present in the cache and creates a properly
1007    /// initialized pool entity with all required metadata including DEX, tokens, fees, and block information.
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns an error if either token is not found in the cache, indicating incomplete token synchronization.
1012    async fn construct_pool(
1013        &mut self,
1014        dex: SharedDex,
1015        event: &PoolCreatedEvent,
1016    ) -> anyhow::Result<Pool> {
1017        let token0 = match self.cache.get_token(&event.token0) {
1018            Some(token) => token.clone(),
1019            None => {
1020                anyhow::bail!("Token {} should be initialized in the cache", event.token0);
1021            }
1022        };
1023        let token1 = match self.cache.get_token(&event.token1) {
1024            Some(token) => token.clone(),
1025            None => {
1026                anyhow::bail!("Token {} should be initialized in the cache", event.token1);
1027            }
1028        };
1029
1030        Ok(Pool::new(
1031            self.chain.clone(),
1032            dex,
1033            event.pool_address,
1034            event.block_number,
1035            token0,
1036            token1,
1037            event.fee,
1038            event.tick_spacing,
1039            UnixNanos::default(), // TODO: Use default timestamp for now
1040        ))
1041    }
1042
1043    /// Registers a decentralized exchange for data collection and event monitoring.
1044    ///
1045    /// Registration involves:
1046    /// 1. Adding the DEX to the cache
1047    /// 2. Loading existing pools for the DEX
1048    /// 3. Configuring event signatures for subscriptions
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if DEX registration, cache operations, or pool loading fails.
1053    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1054        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1055            tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1056
1057            self.cache.add_dex(dex_extended.dex.clone()).await?;
1058            self.cache.load_pools(&dex_id).await?;
1059
1060            self.subscription_manager.register_dex_for_subscriptions(
1061                dex_id,
1062                dex_extended.swap_created_event.as_ref(),
1063                dex_extended.mint_created_event.as_ref(),
1064                dex_extended.burn_created_event.as_ref(),
1065            );
1066            Ok(())
1067        } else {
1068            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1069        }
1070    }
1071
1072    /// Determines the starting block for syncing operations.
1073    fn determine_from_block(&self) -> u64 {
1074        self.config
1075            .from_block
1076            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1077    }
1078
1079    /// Retrieves extended DEX information for a registered DEX.
1080    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1081        if !self.cache.get_registered_dexes().contains(dex_id) {
1082            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1083        }
1084
1085        match get_dex_extended(self.chain.name, dex_id) {
1086            Some(dex) => Ok(dex),
1087            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1088        }
1089    }
1090
1091    /// Retrieves a pool from the cache by its address.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns an error if the pool is not registered in the cache.
1096    pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1097        match self.cache.get_pool(pool_address) {
1098            Some(pool) => Ok(pool),
1099            None => anyhow::bail!("Pool {pool_address} is not registered"),
1100        }
1101    }
1102
1103    /// Sends a data event to all subscribers through the data channel.
1104    pub fn send_data(&self, data: DataEvent) {
1105        if let Some(data_tx) = &self.data_tx {
1106            tracing::debug!("Sending {data}");
1107
1108            if let Err(e) = data_tx.send(data) {
1109                tracing::error!("Failed to send data: {e}");
1110            }
1111        } else {
1112            tracing::error!("No data event channel for sending data");
1113        }
1114    }
1115
1116    /// Disconnects all active connections and cleanup resources.
1117    ///
1118    /// This method should be called when shutting down the client to ensure
1119    /// proper cleanup of network connections and background tasks.
1120    pub fn disconnect(&mut self) {
1121        self.hypersync_client.disconnect();
1122    }
1123}