nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_model::defi::{
21    Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
22    SharedChain, SharedDex, SharedPool,
23    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24    pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
25    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
26};
27use thousands::Separable;
28
29use crate::{
30    cache::BlockchainCache,
31    config::BlockchainDataClientConfig,
32    contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
33    data::subscription::DefiDataSubscriptionManager,
34    events::{
35        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
36    },
37    exchanges::{extended::DexExtended, get_dex_extended},
38    hypersync::{
39        client::HyperSyncClient,
40        helpers::{extract_block_number, extract_event_signature_bytes},
41    },
42    rpc::{
43        BlockchainRpcClient, BlockchainRpcClientAny,
44        chains::{
45            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
46            polygon::PolygonRpcClient,
47        },
48        http::BlockchainHttpRpcClient,
49        types::BlockchainMessage,
50    },
51    services::PoolDiscoveryService,
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
57///
58/// This struct encapsulates the core functionality for interacting with blockchain networks,
59/// including syncing historical data, processing real-time events, and managing cached entities.
60#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62    /// The blockchain being targeted by this client instance.
63    pub chain: SharedChain,
64    /// The configuration for the data client.
65    pub config: BlockchainDataClientConfig,
66    /// Local cache for blockchain entities.
67    pub cache: BlockchainCache,
68    /// Interface for interacting with ERC20 token contracts.
69    tokens: Erc20Contract,
70    /// Interface for interacting with UniswapV3 pool contracts.
71    univ3_pool: UniswapV3PoolContract,
72    /// Client for the HyperSync data indexing service.
73    pub hypersync_client: HyperSyncClient,
74    /// Optional WebSocket RPC client for direct blockchain node communication.
75    pub rpc_client: Option<BlockchainRpcClientAny>,
76    /// Manages subscriptions for various DEX events (swaps, mints, burns).
77    pub subscription_manager: DefiDataSubscriptionManager,
78    /// Channel sender for data events.
79    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
80    /// Cancellation token for graceful shutdown of long-running operations.
81    cancellation_token: tokio_util::sync::CancellationToken,
82}
83
84impl BlockchainDataClientCore {
85    /// Creates a new instance of [`BlockchainDataClientCore`].
86    ///
87    /// # Panics
88    ///
89    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
90    #[must_use]
91    pub fn new(
92        config: BlockchainDataClientConfig,
93        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
94        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
95        cancellation_token: tokio_util::sync::CancellationToken,
96    ) -> Self {
97        let chain = config.chain.clone();
98        let cache = BlockchainCache::new(chain.clone());
99
100        // Log RPC endpoints being used
101        tracing::info!(
102            "Initializing blockchain data client for '{}' with HTTP RPC: {}",
103            chain.name,
104            config.http_rpc_url
105        );
106
107        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109            tracing::info!("WebSocket RPC URL: {}", wss_rpc_url);
110            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
111        } else {
112            tracing::info!("Using HyperSync for live data (no WebSocket RPC)");
113            None
114        };
115        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116            config.http_rpc_url.clone(),
117            config.rpc_requests_per_second,
118        ));
119        let erc20_contract = Erc20Contract::new(
120            http_rpc_client.clone(),
121            config.pool_filters.remove_pools_with_empty_erc20fields,
122        );
123
124        let hypersync_client =
125            HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
126        Self {
127            chain,
128            config,
129            rpc_client,
130            tokens: erc20_contract,
131            univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
132            cache,
133            hypersync_client,
134            subscription_manager: DefiDataSubscriptionManager::new(),
135            data_tx,
136            cancellation_token,
137        }
138    }
139
140    /// Initializes the database connection for the blockchain cache.
141    pub async fn initialize_cache_database(&mut self) {
142        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
143            tracing::info!(
144                "Initializing blockchain cache on database '{}'",
145                pg_connect_options.database
146            );
147            self.cache
148                .initialize_database(pg_connect_options.clone().into())
149                .await;
150        }
151    }
152
153    /// Creates an appropriate blockchain RPC client for the specified blockchain.
154    fn initialize_rpc_client(
155        blockchain: Blockchain,
156        wss_rpc_url: String,
157    ) -> BlockchainRpcClientAny {
158        match blockchain {
159            Blockchain::Ethereum => {
160                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
161            }
162            Blockchain::Polygon => {
163                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
164            }
165            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
166            Blockchain::Arbitrum => {
167                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
168            }
169            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
170        }
171    }
172
173    /// Establishes connections to all configured data sources and initializes the cache.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if cache initialization or connection setup fails.
178    pub async fn connect(&mut self) -> anyhow::Result<()> {
179        tracing::info!(
180            "Connecting blockchain data client for '{}'",
181            self.chain.name
182        );
183        self.initialize_cache_database().await;
184
185        if let Some(ref mut rpc_client) = self.rpc_client {
186            rpc_client.connect().await?;
187        }
188
189        let from_block = self.determine_from_block();
190
191        tracing::info!(
192            "Connecting to blockchain data source for '{}' from block {}",
193            self.chain.name,
194            from_block.separate_with_commas()
195        );
196
197        // Initialize the chain and register the Dex exchanges in the cache.
198        self.cache.initialize_chain().await;
199        // Import the cached blockchain data.
200        self.cache.connect(from_block).await?;
201        // TODO disable block syncing for now as we don't have timestamps yet configured
202        // Sync the remaining blocks which are missing.
203        // self.sync_blocks(Some(from_block), None).await?;
204        for dex in self.config.dex_ids.clone() {
205            self.register_dex_exchange(dex).await?;
206            self.sync_exchange_pools(&dex, from_block, None, false)
207                .await?;
208        }
209
210        Ok(())
211    }
212
213    /// Syncs blocks with consistency checks to ensure data integrity.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if block syncing fails or if consistency checks fail.
218    pub async fn sync_blocks_checked(
219        &mut self,
220        from_block: u64,
221        to_block: Option<u64>,
222    ) -> anyhow::Result<()> {
223        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
224            // If blocks are consistent proceed with copy command.
225            if blocks_status.is_consistent() {
226                tracing::info!(
227                    "Cache is consistent: no gaps detected (last continuous block: {})",
228                    blocks_status.last_continuous_block
229                );
230                let target_block = max(blocks_status.max_block + 1, from_block);
231                tracing::info!(
232                    "Starting fast sync with COPY from block {}",
233                    target_block.separate_with_commas()
234                );
235                self.sync_blocks(target_block, to_block, true).await?;
236            } else {
237                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
238                tracing::info!(
239                    "Cache inconsistency detected: {} blocks missing between {} and {}",
240                    gap_size,
241                    blocks_status.last_continuous_block + 1,
242                    blocks_status.max_block
243                );
244
245                tracing::info!(
246                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
247                    blocks_status.last_continuous_block + 1,
248                    blocks_status.max_block
249                );
250                self.sync_blocks(
251                    blocks_status.last_continuous_block + 1,
252                    Some(blocks_status.max_block),
253                    false,
254                )
255                .await?;
256
257                tracing::info!(
258                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
259                    (blocks_status.max_block + 1).separate_with_commas()
260                );
261                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
262                    .await?;
263            }
264        } else {
265            self.sync_blocks(from_block, to_block, true).await?;
266        }
267
268        Ok(())
269    }
270
271    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if block fetching, caching, or database operations fail.
276    pub async fn sync_blocks(
277        &mut self,
278        from_block: u64,
279        to_block: Option<u64>,
280        use_copy_command: bool,
281    ) -> anyhow::Result<()> {
282        let to_block = if let Some(block) = to_block {
283            block
284        } else {
285            self.hypersync_client.current_block().await
286        };
287        let total_blocks = to_block.saturating_sub(from_block) + 1;
288        tracing::info!(
289            "Syncing blocks from {} to {} (total: {} blocks)",
290            from_block.separate_with_commas(),
291            to_block.separate_with_commas(),
292            total_blocks.separate_with_commas()
293        );
294
295        // Enable performance settings for sync operations
296        if let Err(e) = self.cache.toggle_performance_settings(true).await {
297            tracing::warn!("Failed to enable performance settings: {e}");
298        }
299
300        let blocks_stream = self
301            .hypersync_client
302            .request_blocks_stream(from_block, Some(to_block))
303            .await;
304
305        tokio::pin!(blocks_stream);
306
307        let mut metrics = BlockchainSyncReporter::new(
308            BlockchainSyncReportItems::Blocks,
309            from_block,
310            total_blocks,
311            BLOCKS_PROCESS_IN_SYNC_REPORT,
312        );
313
314        // Batch configuration
315        const BATCH_SIZE: usize = 1000;
316        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
317
318        let cancellation_token = self.cancellation_token.clone();
319        let sync_result = tokio::select! {
320            () = cancellation_token.cancelled() => {
321                tracing::info!("Block sync cancelled");
322                Err(anyhow::anyhow!("Sync cancelled"))
323            }
324            result = async {
325                while let Some(block) = blocks_stream.next().await {
326                    let block_number = block.number;
327                    if self.cache.get_block_timestamp(block_number).is_some() {
328                        continue;
329                    }
330                    batch.push(block);
331
332                    // Process batch when full or last block
333                    if batch.len() >= BATCH_SIZE || block_number >= to_block {
334                        let batch_size = batch.len();
335
336                        self.cache.add_blocks_batch(batch, use_copy_command).await?;
337                        metrics.update(batch_size);
338
339                        // Re-initialize batch vector
340                        batch = Vec::with_capacity(BATCH_SIZE);
341                    }
342
343                    // Log progress if needed
344                    if metrics.should_log_progress(block_number, to_block) {
345                        metrics.log_progress(block_number);
346                    }
347                }
348
349                // Process any remaining blocks
350                if !batch.is_empty() {
351                    let batch_size = batch.len();
352                    self.cache.add_blocks_batch(batch, use_copy_command).await?;
353                    metrics.update(batch_size);
354                }
355
356                metrics.log_final_stats();
357                Ok(())
358            } => result
359        };
360
361        sync_result?;
362
363        // Restore default safe settings after sync completion
364        if let Err(e) = self.cache.toggle_performance_settings(false).await {
365            tracing::warn!("Failed to restore default settings: {e}");
366        }
367
368        Ok(())
369    }
370
371    /// Synchronizes all events for a specific pool within the given block range.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if event syncing, parsing, or database operations fail.
376    pub async fn sync_pool_events(
377        &mut self,
378        dex: &DexType,
379        pool_identifier: PoolIdentifier,
380        from_block: Option<u64>,
381        to_block: Option<u64>,
382        reset: bool,
383    ) -> anyhow::Result<()> {
384        let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
385        let pool_display = pool.to_full_spec_string();
386        let from_block = from_block.unwrap_or(pool.creation_block);
387        // Extract address for blockchain queries
388        let pool_address = &pool.address;
389
390        let (last_synced_block, effective_from_block) = if reset {
391            (None, from_block)
392        } else {
393            let last_synced_block = self
394                .cache
395                .get_pool_last_synced_block(dex, &pool_identifier)
396                .await?;
397            let effective_from_block = last_synced_block
398                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
399            (last_synced_block, effective_from_block)
400        };
401
402        let to_block = match to_block {
403            Some(block) => block,
404            None => self.hypersync_client.current_block().await,
405        };
406
407        // Skip sync if we're already up to date
408        if effective_from_block > to_block {
409            tracing::info!(
410                "D {} already synced to block {} (current: {}), skipping sync",
411                dex,
412                last_synced_block.unwrap_or(0).separate_with_commas(),
413                to_block.separate_with_commas()
414            );
415            return Ok(());
416        }
417
418        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
419        let last_block_across_pool_events_table = self
420            .cache
421            .get_pool_event_tables_last_block(&pool_identifier)
422            .await?;
423
424        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
425        tracing::info!(
426            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
427            pool_display,
428            effective_from_block.separate_with_commas(),
429            to_block.separate_with_commas(),
430            total_blocks.separate_with_commas(),
431            if let Some(last_synced) = last_synced_block {
432                format!(
433                    " - resuming from last synced block {}",
434                    last_synced.separate_with_commas()
435                )
436            } else {
437                String::new()
438            }
439        );
440
441        let mut metrics = BlockchainSyncReporter::new(
442            BlockchainSyncReportItems::PoolEvents,
443            effective_from_block,
444            total_blocks,
445            BLOCKS_PROCESS_IN_SYNC_REPORT,
446        );
447        let dex_extended = self.get_dex_extended(dex)?.clone();
448        let swap_event_signature = dex_extended.swap_created_event.as_ref();
449        let mint_event_signature = dex_extended.mint_created_event.as_ref();
450        let burn_event_signature = dex_extended.burn_created_event.as_ref();
451        let collect_event_signature = dex_extended.collect_created_event.as_ref();
452        let flash_event_signature = dex_extended.flash_created_event.as_ref();
453        let initialize_event_signature: Option<&str> =
454            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
455
456        // Pre-decode event signatures to bytes for efficient comparison
457        let swap_sig_bytes = hex::decode(
458            swap_event_signature
459                .strip_prefix("0x")
460                .unwrap_or(swap_event_signature),
461        )?;
462        let mint_sig_bytes = hex::decode(
463            mint_event_signature
464                .strip_prefix("0x")
465                .unwrap_or(mint_event_signature),
466        )?;
467        let burn_sig_bytes = hex::decode(
468            burn_event_signature
469                .strip_prefix("0x")
470                .unwrap_or(burn_event_signature),
471        )?;
472        let collect_sig_bytes = hex::decode(
473            collect_event_signature
474                .strip_prefix("0x")
475                .unwrap_or(collect_event_signature),
476        )?;
477        let flash_sig_bytes = flash_event_signature
478            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
479        let initialize_sig_bytes = initialize_event_signature
480            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481
482        let mut event_signatures = vec![
483            swap_event_signature,
484            mint_event_signature,
485            burn_event_signature,
486            collect_event_signature,
487        ];
488        if let Some(event) = dex_extended.initialize_event.as_ref() {
489            event_signatures.push(event);
490        }
491        if let Some(event) = dex_extended.flash_created_event.as_ref() {
492            event_signatures.push(event);
493        }
494        let pool_events_stream = self
495            .hypersync_client
496            .request_contract_events_stream(
497                effective_from_block,
498                Some(to_block),
499                pool_address,
500                event_signatures,
501            )
502            .await;
503        tokio::pin!(pool_events_stream);
504
505        let mut last_block_saved = effective_from_block;
506        let mut blocks_processed = 0;
507
508        // Batch configuration for events
509        const EVENT_BATCH_SIZE: usize = 20000;
510        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515        // Track when we've moved beyond stale data and can use COPY
516        let mut beyond_stale_data = last_block_across_pool_events_table
517            .is_none_or(|tables_max| effective_from_block > tables_max);
518
519        let cancellation_token = self.cancellation_token.clone();
520        let sync_result = tokio::select! {
521            () = cancellation_token.cancelled() => {
522                tracing::info!("Pool event sync cancelled");
523                Err(anyhow::anyhow!("Sync cancelled"))
524            }
525            result = async {
526                while let Some(log) = pool_events_stream.next().await {
527                    let block_number = extract_block_number(&log)?;
528                    blocks_processed += block_number - last_block_saved;
529                    last_block_saved = block_number;
530
531                    let event_sig_bytes = extract_event_signature_bytes(&log)?;
532            if event_sig_bytes == swap_sig_bytes.as_slice() {
533                let swap_event = dex_extended.parse_swap_event_hypersync(log)?;
534                match self.process_pool_swap_event(&swap_event, &pool) {
535                    Ok(swap) => swap_batch.push(swap),
536                    Err(e) => tracing::error!("Failed to process swap event: {e}"),
537                }
538            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539                let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
540                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
543                }
544            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545                let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
546                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
549                }
550            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551                let collect_event = dex_extended.parse_collect_event_hypersync(log)?;
552                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553                    Ok(fee_collect) => collect_batch.push(fee_collect),
554                    Err(e) => tracing::error!("Failed to process collect event: {e}"),
555                }
556            } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557                let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
558                self.cache
559                    .update_pool_initialize_price_tick(&initialize_event)
560                    .await?;
561            } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562                if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
563                    match parse_fn(dex_extended.dex.clone(), log) {
564                        Ok(flash_event) => {
565                            match self.process_pool_flash_event(&flash_event, &pool) {
566                                Ok(flash) => flash_batch.push(flash),
567                                Err(e) => tracing::error!("Failed to process flash event: {e}"),
568                            }
569                        }
570                        Err(e) => tracing::error!("Failed to parse flash event: {e}"),
571                    }
572                }
573            } else {
574                let event_signature = hex::encode(event_sig_bytes);
575                tracing::error!(
576                    "Unexpected event signature: {} for log {:?}",
577                    event_signature,
578                    log
579                );
580            }
581
582            // Check if we've moved beyond stale data (transition point for strategy change)
583            if !beyond_stale_data
584                && last_block_across_pool_events_table
585                    .is_some_and(|table_max| block_number > table_max)
586            {
587                tracing::info!(
588                    "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
589                    block_number
590                );
591
592                // Flush all batches with ON CONFLICT to handle any remaining duplicates
593                self.flush_event_batches(
594                    EVENT_BATCH_SIZE,
595                    &mut swap_batch,
596                    &mut liquidity_batch,
597                    &mut collect_batch,
598                    &mut flash_batch,
599                    false,
600                    true,
601                )
602                .await?;
603
604                beyond_stale_data = true;
605                tracing::info!("Switched to COPY mode - future batches will use COPY command");
606            } else {
607                // Process batches when they reach batch size
608                self.flush_event_batches(
609                    EVENT_BATCH_SIZE,
610                    &mut swap_batch,
611                    &mut liquidity_batch,
612                    &mut collect_batch,
613                    &mut flash_batch,
614                    false, // TODO temporary dont use copy command
615                    false,
616                )
617                .await?;
618            }
619
620            metrics.update(blocks_processed as usize);
621            blocks_processed = 0;
622
623            // Log progress if needed
624            if metrics.should_log_progress(block_number, to_block) {
625                metrics.log_progress(block_number);
626                self.cache
627                    .update_pool_last_synced_block(dex, &pool_identifier, block_number)
628                    .await?;
629            }
630        }
631
632        self.flush_event_batches(
633            EVENT_BATCH_SIZE,
634            &mut swap_batch,
635            &mut liquidity_batch,
636            &mut collect_batch,
637            &mut flash_batch,
638            false,
639            true,
640        )
641        .await?;
642
643        metrics.log_final_stats();
644        self.cache
645            .update_pool_last_synced_block(dex, &pool_identifier, to_block)
646            .await?;
647
648        tracing::info!(
649            "Successfully synced Dex '{}' Pool '{}' up to block {}",
650            dex,
651            pool_display,
652            to_block.separate_with_commas()
653        );
654                Ok(())
655            } => result
656        };
657
658        sync_result
659    }
660
661    #[allow(clippy::too_many_arguments)]
662    async fn flush_event_batches(
663        &mut self,
664        event_batch_size: usize,
665        swap_batch: &mut Vec<PoolSwap>,
666        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
667        collect_batch: &mut Vec<PoolFeeCollect>,
668        flash_batch: &mut Vec<PoolFlash>,
669        use_copy_command: bool,
670        force_flush_all: bool,
671    ) -> anyhow::Result<()> {
672        if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
673            self.cache
674                .add_pool_swaps_batch(swap_batch, use_copy_command)
675                .await?;
676            swap_batch.clear();
677        }
678        if (force_flush_all || liquidity_batch.len() >= event_batch_size)
679            && !liquidity_batch.is_empty()
680        {
681            self.cache
682                .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
683                .await?;
684            liquidity_batch.clear();
685        }
686        if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
687        {
688            self.cache
689                .add_pool_fee_collects_batch(collect_batch, use_copy_command)
690                .await?;
691            collect_batch.clear();
692        }
693        if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
694            self.cache.add_pool_flash_batch(flash_batch).await?;
695            flash_batch.clear();
696        }
697        Ok(())
698    }
699
700    /// Processes a swap event and converts it to a pool swap.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if swap event processing fails.
705    ///
706    /// # Panics
707    ///
708    /// Panics if swap event conversion to trade data fails.
709    pub fn process_pool_swap_event(
710        &self,
711        swap_event: &SwapEvent,
712        pool: &SharedPool,
713    ) -> anyhow::Result<PoolSwap> {
714        let timestamp = self
715            .cache
716            .get_block_timestamp(swap_event.block_number)
717            .copied();
718        let mut swap = swap_event.to_pool_swap(
719            self.chain.clone(),
720            pool.instrument_id,
721            pool.pool_identifier,
722            timestamp,
723        );
724        swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
725
726        Ok(swap)
727    }
728
729    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
734    pub fn process_pool_mint_event(
735        &self,
736        mint_event: &MintEvent,
737        pool: &SharedPool,
738        dex_extended: &DexExtended,
739    ) -> anyhow::Result<PoolLiquidityUpdate> {
740        let timestamp = self
741            .cache
742            .get_block_timestamp(mint_event.block_number)
743            .copied();
744
745        let liquidity_update = mint_event.to_pool_liquidity_update(
746            self.chain.clone(),
747            dex_extended.dex.clone(),
748            pool.instrument_id,
749            timestamp,
750        );
751
752        // self.cache.add_liquidity_update(&liquidity_update).await?;
753
754        Ok(liquidity_update)
755    }
756
757    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
758    /// Processes a pool burn event and converts it to a liquidity update.
759    ///
760    /// # Errors
761    ///
762    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
763    pub fn process_pool_burn_event(
764        &self,
765        burn_event: &BurnEvent,
766        pool: &SharedPool,
767        dex_extended: &DexExtended,
768    ) -> anyhow::Result<PoolLiquidityUpdate> {
769        let timestamp = self
770            .cache
771            .get_block_timestamp(burn_event.block_number)
772            .copied();
773
774        let liquidity_update = burn_event.to_pool_liquidity_update(
775            self.chain.clone(),
776            dex_extended.dex.clone(),
777            pool.instrument_id,
778            pool.pool_identifier,
779            timestamp,
780        );
781
782        // self.cache.add_liquidity_update(&liquidity_update).await?;
783
784        Ok(liquidity_update)
785    }
786
787    /// Processes a pool collect event and converts it to a fee collection.
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
792    pub fn process_pool_collect_event(
793        &self,
794        collect_event: &CollectEvent,
795        pool: &SharedPool,
796        dex_extended: &DexExtended,
797    ) -> anyhow::Result<PoolFeeCollect> {
798        let timestamp = self
799            .cache
800            .get_block_timestamp(collect_event.block_number)
801            .copied();
802
803        let fee_collect = collect_event.to_pool_fee_collect(
804            self.chain.clone(),
805            dex_extended.dex.clone(),
806            pool.instrument_id,
807            timestamp,
808        );
809
810        Ok(fee_collect)
811    }
812
813    /// Processes a pool flash event and converts it to a flash loan.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
818    pub fn process_pool_flash_event(
819        &self,
820        flash_event: &FlashEvent,
821        pool: &SharedPool,
822    ) -> anyhow::Result<PoolFlash> {
823        let timestamp = self
824            .cache
825            .get_block_timestamp(flash_event.block_number)
826            .copied();
827
828        let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
829
830        Ok(flash)
831    }
832
833    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
834    ///
835    /// This method performs a comprehensive sync of:
836    /// 1. Pool creation events from the DEX factory
837    /// 2. Token metadata for all tokens in discovered pools
838    /// 3. Pool entities with proper token associations
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if syncing pools, tokens, or DEX operations fail.
843    pub async fn sync_exchange_pools(
844        &mut self,
845        dex: &DexType,
846        from_block: u64,
847        to_block: Option<u64>,
848        reset: bool,
849    ) -> anyhow::Result<()> {
850        let dex_extended = self.get_dex_extended(dex)?.clone();
851
852        let mut service = PoolDiscoveryService::new(
853            self.chain.clone(),
854            &mut self.cache,
855            &self.tokens,
856            &self.hypersync_client,
857            self.cancellation_token.clone(),
858            self.config.clone(),
859        );
860
861        service
862            .sync_pools(&dex_extended, from_block, to_block, reset)
863            .await?;
864
865        Ok(())
866    }
867
868    /// Registers a decentralized exchange for data collection and event monitoring.
869    ///
870    /// Registration involves:
871    /// 1. Adding the DEX to the cache
872    /// 2. Loading existing pools for the DEX
873    /// 3. Configuring event signatures for subscriptions
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if DEX registration, cache operations, or pool loading fails.
878    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
879        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
880            tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
881
882            self.cache.add_dex(dex_extended.dex.clone()).await?;
883            let _ = self.cache.load_pools(&dex_id).await?;
884
885            self.subscription_manager.register_dex_for_subscriptions(
886                dex_id,
887                dex_extended.swap_created_event.as_ref(),
888                dex_extended.mint_created_event.as_ref(),
889                dex_extended.burn_created_event.as_ref(),
890                dex_extended.collect_created_event.as_ref(),
891                dex_extended.flash_created_event.as_deref(),
892            );
893            Ok(())
894        } else {
895            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
896        }
897    }
898
899    /// Bootstraps a [`PoolProfiler`] with the latest state for a given pool.
900    ///
901    /// Uses two paths depending on whether the pool has been synced to the database:
902    /// - **Never synced**: Streams events from HyperSync → restores from on-chain RPC → returns `(profiler, true)`
903    /// - **Previously synced**: Syncs new events to DB → streams from DB → returns `(profiler, false)`
904    ///
905    /// Both paths restore from the latest valid snapshot first (if available), otherwise initialize with pool's initial price.
906    ///
907    /// # Returns
908    ///
909    /// - `PoolProfiler`: Hydrated profiler with current pool state
910    /// - `bool`: `true` if constructed from RPC (already valid), `false` if from DB (needs validation)
911    ///
912    /// # Errors
913    ///
914    /// Returns an error if database is not initialized or event processing fails.
915    ///
916    /// # Panics
917    ///
918    /// Panics if the database reference is unavailable.
919    pub async fn bootstrap_latest_pool_profiler(
920        &mut self,
921        pool: &SharedPool,
922    ) -> anyhow::Result<(PoolProfiler, bool)> {
923        tracing::info!(
924            "Bootstrapping latest pool profiler for pool {}",
925            pool.address
926        );
927
928        if self.cache.database.is_none() {
929            anyhow::bail!(
930                "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
931            );
932        }
933
934        let mut profiler = PoolProfiler::new(pool.clone());
935
936        // Calculate latest valid block position after which we need to start profiling.
937        let from_position = match self
938            .cache
939            .database
940            .as_ref()
941            .unwrap()
942            .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
943            .await
944        {
945            Ok(Some(snapshot)) => {
946                tracing::info!(
947                    "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
948                    snapshot.block_position.number.separate_with_commas(),
949                    snapshot.positions.len(),
950                    snapshot.ticks.len()
951                );
952                let block_position = snapshot.block_position.clone();
953                profiler.restore_from_snapshot(snapshot)?;
954                tracing::info!("Restored profiler from snapshot");
955                Some(block_position)
956            }
957            _ => {
958                tracing::info!("No valid snapshot found, processing from beginning");
959                None
960            }
961        };
962
963        // If we don't have never synced pool events, proceed with faster
964        // construction of pool profiler from hypersync and RPC, where we
965        // dont need syncing of pool events and fetching it from database
966        if self
967            .cache
968            .database
969            .as_ref()
970            .unwrap()
971            .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
972            .await?
973            .is_none()
974        {
975            return self
976                .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
977                .await;
978        }
979
980        // Sync the pool events before bootstrapping of pool profiler
981        if let Err(e) = self
982            .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
983            .await
984        {
985            tracing::error!("Failed to sync pool events for snapshot request: {}", e);
986        }
987
988        if !profiler.is_initialized {
989            if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
990                profiler.initialize(initial_sqrt_price_x96);
991            } else {
992                anyhow::bail!(
993                    "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
994                );
995            }
996        }
997
998        let from_block = from_position
999            .as_ref()
1000            .map_or(profiler.pool.creation_block, |block_position| {
1001                block_position.number
1002            });
1003        let to_block = self.hypersync_client.current_block().await;
1004        let total_blocks = to_block.saturating_sub(from_block) + 1;
1005
1006        // Enable embedded profiler reporting
1007        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1008
1009        let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1010            pool.chain.clone(),
1011            pool.dex.clone(),
1012            pool.instrument_id,
1013            pool.pool_identifier,
1014            from_position.clone(),
1015        );
1016
1017        while let Some(result) = stream.next().await {
1018            match result {
1019                Ok(event) => {
1020                    profiler.process(&event)?;
1021                }
1022                Err(e) => log::error!("Error processing event: {e}"),
1023            }
1024        }
1025
1026        profiler.finalize_reporting();
1027
1028        Ok((profiler, false))
1029    }
1030
1031    /// Constructs a pool profiler by fetching events directly from HyperSync RPC.
1032    ///
1033    /// This method is used when the pool has never been synced to the database. It streams
1034    /// liquidity events (mints, burns) directly from HyperSync and processes them
1035    /// to build up the profiler's state in real-time. After processing all events, it
1036    /// restores the profiler from the current on-chain state with the provided ticks and positions
1037    ///
1038    /// # Returns
1039    ///
1040    /// Returns a tuple of:
1041    /// - `PoolProfiler`: The hydrated profiler with state built from events
1042    /// - `bool`: Always `true` to indicate the profiler state was valid, and it was constructed from RPC
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns an error if:
1047    /// - Event streaming from HyperSync fails
1048    /// - Event parsing or processing fails
1049    /// - DEX configuration is invalid
1050    async fn construct_pool_profiler_from_hypersync_rpc(
1051        &self,
1052        mut profiler: PoolProfiler,
1053        from_position: Option<BlockPosition>,
1054    ) -> anyhow::Result<(PoolProfiler, bool)> {
1055        tracing::info!(
1056            "Constructing pool profiler from hypersync stream and RPC final state querying"
1057        );
1058        let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1059        let mint_event_signature = dex_extended.mint_created_event.as_ref();
1060        let burn_event_signature = dex_extended.burn_created_event.as_ref();
1061        let initialize_event_signature =
1062            if let Some(initialize_event) = &dex_extended.initialize_event {
1063                initialize_event.as_ref()
1064            } else {
1065                anyhow::bail!(
1066                    "DEX {} does not have initialize event set.",
1067                    &profiler.pool.dex.name
1068                );
1069            };
1070        let mint_sig_bytes = hex::decode(
1071            mint_event_signature
1072                .strip_prefix("0x")
1073                .unwrap_or(mint_event_signature),
1074        )?;
1075        let burn_sig_bytes = hex::decode(
1076            burn_event_signature
1077                .strip_prefix("0x")
1078                .unwrap_or(burn_event_signature),
1079        )?;
1080        let initialize_sig_bytes = hex::decode(
1081            initialize_event_signature
1082                .strip_prefix("0x")
1083                .unwrap_or(initialize_event_signature),
1084        )?;
1085
1086        let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1087            block_position.number
1088        });
1089        let to_block = self.hypersync_client.current_block().await;
1090        let total_blocks = to_block.saturating_sub(from_block) + 1;
1091
1092        tracing::info!(
1093            "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1094            profiler.pool.address,
1095            from_block.separate_with_commas(),
1096            to_block.separate_with_commas(),
1097            total_blocks.separate_with_commas()
1098        );
1099
1100        // Enable embedded profiler reporting
1101        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1102
1103        let pool_events_stream = self
1104            .hypersync_client
1105            .request_contract_events_stream(
1106                from_block,
1107                None,
1108                &profiler.pool.address,
1109                vec![
1110                    mint_event_signature,
1111                    burn_event_signature,
1112                    initialize_event_signature,
1113                ],
1114            )
1115            .await;
1116        tokio::pin!(pool_events_stream);
1117
1118        while let Some(log) = pool_events_stream.next().await {
1119            let event_sig_bytes = extract_event_signature_bytes(&log)?;
1120
1121            if event_sig_bytes == initialize_sig_bytes {
1122                let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
1123                profiler.initialize(initialize_event.sqrt_price_x96);
1124                self.cache
1125                    .database
1126                    .as_ref()
1127                    .unwrap()
1128                    .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1129                    .await?;
1130            } else if event_sig_bytes == mint_sig_bytes {
1131                let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
1132                match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1133                    Ok(liquidity_update) => {
1134                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1135                    }
1136                    Err(e) => tracing::error!("Failed to process mint event: {e}"),
1137                }
1138            } else if event_sig_bytes == burn_sig_bytes {
1139                let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
1140                match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1141                    Ok(liquidity_update) => {
1142                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1143                    }
1144                    Err(e) => tracing::error!("Failed to process burn event: {e}"),
1145                }
1146            } else {
1147                let event_signature = hex::encode(event_sig_bytes);
1148                tracing::error!(
1149                    "Unexpected event signature in bootstrap_latest_pool_profiler: {} for log {:?}",
1150                    event_signature,
1151                    log
1152                );
1153            }
1154        }
1155
1156        profiler.finalize_reporting();
1157
1158        // Hydrate from the current RPC state
1159        match self.get_on_chain_snapshot(&profiler).await {
1160            Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1161            Err(e) => tracing::error!(
1162                "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1163            ),
1164        }
1165
1166        Ok((profiler, true))
1167    }
1168
1169    /// Validates a pool profiler's state against on-chain data for accuracy verification.
1170    ///
1171    /// This method performs integrity checking by comparing the profiler's internal state
1172    /// (positions, ticks, liquidity) with the actual on-chain smart contract state. For UniswapV3
1173    /// pools, it fetches current on-chain data and verifies that the profiler's tracked state matches.
1174    /// If validation succeeds or is bypassed, the snapshot is marked as valid in the database.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns an error if database operations fail when marking the snapshot as valid.
1179    ///
1180    /// # Panics
1181    ///
1182    /// Panics if the profiler does not have a last_processed_event when already_validated is true.
1183    pub async fn check_snapshot_validity(
1184        &self,
1185        profiler: &PoolProfiler,
1186        already_validated: bool,
1187    ) -> anyhow::Result<bool> {
1188        // Determine validity and get block position for marking
1189        let (is_valid, block_position) = if already_validated {
1190            // Skip RPC call - profiler was validated during construction from RPC
1191            tracing::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1192            let last_event = profiler
1193                .last_processed_event
1194                .clone()
1195                .expect("Profiler should have last_processed_event");
1196            (true, last_event)
1197        } else {
1198            // Fetch on-chain state and compare
1199            match self.get_on_chain_snapshot(profiler).await {
1200                Ok(on_chain_snapshot) => {
1201                    tracing::info!("Comparing profiler state with on-chain state...");
1202                    let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1203                    if !valid {
1204                        tracing::error!(
1205                            "Pool profiler state does NOT match on-chain smart contract state"
1206                        );
1207                    }
1208                    (valid, on_chain_snapshot.block_position)
1209                }
1210                Err(e) => {
1211                    tracing::error!("Failed to check snapshot validity: {e}");
1212                    return Ok(false);
1213                }
1214            }
1215        };
1216
1217        // Mark snapshot as valid in database if validation passed
1218        if is_valid && let Some(cache_database) = &self.cache.database {
1219            cache_database
1220                .mark_pool_snapshot_valid(
1221                    profiler.pool.chain.chain_id,
1222                    &profiler.pool.pool_identifier,
1223                    block_position.number,
1224                    block_position.transaction_index,
1225                    block_position.log_index,
1226                )
1227                .await?;
1228            tracing::info!("Marked pool profiler snapshot as valid");
1229        }
1230
1231        Ok(is_valid)
1232    }
1233
1234    /// Fetches current on-chain pool state at the last processed block.
1235    ///
1236    /// Queries the pool smart contract to retrieve active tick liquidity and position data,
1237    /// using the profiler's active positions and last processed block number.
1238    /// Used for profiler state restoration after bootstrapping and validation.
1239    async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1240        if profiler.pool.dex.name == DexType::UniswapV3 {
1241            let last_processed_event = profiler
1242                .last_processed_event
1243                .clone()
1244                .expect("We expect at least one processed event in the pool");
1245            let on_chain_snapshot = self
1246                .univ3_pool
1247                .fetch_snapshot(
1248                    &profiler.pool.address,
1249                    profiler.pool.instrument_id,
1250                    profiler.get_active_tick_values().as_slice(),
1251                    &profiler.get_all_position_keys(),
1252                    last_processed_event,
1253                )
1254                .await?;
1255
1256            Ok(on_chain_snapshot)
1257        } else {
1258            anyhow::bail!(
1259                "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1260                profiler.pool.dex.name
1261            )
1262        }
1263    }
1264
1265    /// Replays historical events for a pool to hydrate its profiler state.
1266    ///
1267    /// Streams all historical swap, liquidity, and fee collect events from the database
1268    /// and sends them through the normal data event pipeline to build up pool profiler state.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns an error if database streaming fails or event processing fails.
1273    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1274        if let Some(database) = &self.cache.database {
1275            tracing::info!(
1276                "Replaying historical events for pool {} to hydrate profiler",
1277                pool.instrument_id
1278            );
1279
1280            let mut event_stream = database.stream_pool_events(
1281                self.chain.clone(),
1282                dex.clone(),
1283                pool.instrument_id,
1284                pool.pool_identifier,
1285                None,
1286            );
1287            let mut event_count = 0;
1288
1289            while let Some(event_result) = event_stream.next().await {
1290                match event_result {
1291                    Ok(event) => {
1292                        let data_event = match event {
1293                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1294                            DexPoolData::LiquidityUpdate(update) => {
1295                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1296                            }
1297                            DexPoolData::FeeCollect(collect) => {
1298                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1299                            }
1300                            DexPoolData::Flash(flash) => {
1301                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1302                            }
1303                        };
1304                        self.send_data(data_event);
1305                        event_count += 1;
1306                    }
1307                    Err(e) => {
1308                        tracing::error!(
1309                            "Error streaming event for pool {}: {e}",
1310                            pool.instrument_id
1311                        );
1312                    }
1313                }
1314            }
1315
1316            tracing::info!(
1317                "Replayed {event_count} historical events for pool {}",
1318                pool.instrument_id
1319            );
1320        } else {
1321            tracing::debug!(
1322                "No database available, skipping event replay for pool {}",
1323                pool.instrument_id
1324            );
1325        }
1326
1327        Ok(())
1328    }
1329
1330    /// Determines the starting block for syncing operations.
1331    fn determine_from_block(&self) -> u64 {
1332        self.config
1333            .from_block
1334            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1335    }
1336
1337    /// Retrieves extended DEX information for a registered DEX.
1338    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1339        if !self.cache.get_registered_dexes().contains(dex_id) {
1340            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1341        }
1342
1343        match get_dex_extended(self.chain.name, dex_id) {
1344            Some(dex) => Ok(dex),
1345            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1346        }
1347    }
1348
1349    /// Retrieves a pool from the cache by its address.
1350    ///
1351    /// # Errors
1352    ///
1353    /// Returns an error if the pool is not registered in the cache.
1354    pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1355        match self.cache.get_pool(pool_identifier) {
1356            Some(pool) => Ok(pool),
1357            None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1358        }
1359    }
1360
1361    /// Sends a data event to all subscribers through the data channel.
1362    pub fn send_data(&self, data: DataEvent) {
1363        if let Some(data_tx) = &self.data_tx {
1364            tracing::debug!("Sending {data}");
1365
1366            if let Err(e) = data_tx.send(data) {
1367                tracing::error!("Failed to send data: {e}");
1368            }
1369        } else {
1370            tracing::error!("No data event channel for sending data");
1371        }
1372    }
1373
1374    /// Disconnects all active connections and cleanup resources.
1375    ///
1376    /// This method should be called when shutting down the client to ensure
1377    /// proper cleanup of network connections and background tasks.
1378    pub async fn disconnect(&mut self) {
1379        self.hypersync_client.disconnect().await;
1380    }
1381}