nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::formatting::Separable;
21use nautilus_model::defi::{
22    Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
23    SharedChain, SharedDex, SharedPool,
24    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25    pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
26    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
27};
28
29use crate::{
30    cache::BlockchainCache,
31    config::BlockchainDataClientConfig,
32    contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
33    data::subscription::DefiDataSubscriptionManager,
34    events::{
35        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
36    },
37    exchanges::{extended::DexExtended, get_dex_extended},
38    hypersync::{
39        client::HyperSyncClient,
40        helpers::{extract_block_number, extract_event_signature_bytes},
41    },
42    rpc::{
43        BlockchainRpcClient, BlockchainRpcClientAny,
44        chains::{
45            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
46            polygon::PolygonRpcClient,
47        },
48        http::BlockchainHttpRpcClient,
49        types::BlockchainMessage,
50    },
51    services::PoolDiscoveryService,
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
57///
58/// This struct encapsulates the core functionality for interacting with blockchain networks,
59/// including syncing historical data, processing real-time events, and managing cached entities.
60#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62    /// The blockchain being targeted by this client instance.
63    pub chain: SharedChain,
64    /// The configuration for the data client.
65    pub config: BlockchainDataClientConfig,
66    /// Local cache for blockchain entities.
67    pub cache: BlockchainCache,
68    /// Interface for interacting with ERC20 token contracts.
69    tokens: Erc20Contract,
70    /// Interface for interacting with UniswapV3 pool contracts.
71    univ3_pool: UniswapV3PoolContract,
72    /// Client for the HyperSync data indexing service.
73    pub hypersync_client: HyperSyncClient,
74    /// Optional WebSocket RPC client for direct blockchain node communication.
75    pub rpc_client: Option<BlockchainRpcClientAny>,
76    /// Manages subscriptions for various DEX events (swaps, mints, burns).
77    pub subscription_manager: DefiDataSubscriptionManager,
78    /// Channel sender for data events.
79    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
80    /// Cancellation token for graceful shutdown of long-running operations.
81    cancellation_token: tokio_util::sync::CancellationToken,
82}
83
84impl BlockchainDataClientCore {
85    /// Creates a new instance of [`BlockchainDataClientCore`].
86    ///
87    /// # Panics
88    ///
89    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
90    #[must_use]
91    pub fn new(
92        config: BlockchainDataClientConfig,
93        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
94        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
95        cancellation_token: tokio_util::sync::CancellationToken,
96    ) -> Self {
97        let chain = config.chain.clone();
98        let cache = BlockchainCache::new(chain.clone());
99
100        // Log RPC endpoints being used
101        log::info!(
102            "Initializing blockchain data client for '{}' with HTTP RPC: {}",
103            chain.name,
104            config.http_rpc_url
105        );
106
107        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109            log::info!("WebSocket RPC URL: {wss_rpc_url}");
110            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
111        } else {
112            log::info!("Using HyperSync for live data (no WebSocket RPC)");
113            None
114        };
115        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116            config.http_rpc_url.clone(),
117            config.rpc_requests_per_second,
118        ));
119        let erc20_contract = Erc20Contract::new(
120            http_rpc_client.clone(),
121            config.pool_filters.remove_pools_with_empty_erc20fields,
122        );
123
124        let hypersync_client =
125            HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
126        Self {
127            chain,
128            config,
129            rpc_client,
130            tokens: erc20_contract,
131            univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
132            cache,
133            hypersync_client,
134            subscription_manager: DefiDataSubscriptionManager::new(),
135            data_tx,
136            cancellation_token,
137        }
138    }
139
140    /// Initializes the database connection for the blockchain cache.
141    pub async fn initialize_cache_database(&mut self) {
142        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
143            log::info!(
144                "Initializing blockchain cache on database '{}'",
145                pg_connect_options.database
146            );
147            self.cache
148                .initialize_database(pg_connect_options.clone().into())
149                .await;
150        }
151    }
152
153    /// Creates an appropriate blockchain RPC client for the specified blockchain.
154    fn initialize_rpc_client(
155        blockchain: Blockchain,
156        wss_rpc_url: String,
157    ) -> BlockchainRpcClientAny {
158        match blockchain {
159            Blockchain::Ethereum => {
160                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
161            }
162            Blockchain::Polygon => {
163                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
164            }
165            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
166            Blockchain::Arbitrum => {
167                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
168            }
169            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
170        }
171    }
172
173    /// Establishes connections to all configured data sources and initializes the cache.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if cache initialization or connection setup fails.
178    pub async fn connect(&mut self) -> anyhow::Result<()> {
179        log::info!(
180            "Connecting blockchain data client for '{}'",
181            self.chain.name
182        );
183        self.initialize_cache_database().await;
184
185        if let Some(ref mut rpc_client) = self.rpc_client {
186            rpc_client.connect().await?;
187        }
188
189        let from_block = self.determine_from_block();
190
191        log::info!(
192            "Connecting to blockchain data source for '{}' from block {}",
193            self.chain.name,
194            from_block.separate_with_commas()
195        );
196
197        // Initialize the chain and register the Dex exchanges in the cache.
198        self.cache.initialize_chain().await;
199        // Import the cached blockchain data.
200        self.cache.connect(from_block).await?;
201        // TODO disable block syncing for now as we don't have timestamps yet configured
202        // Sync the remaining blocks which are missing.
203        // self.sync_blocks(Some(from_block), None).await?;
204        for dex in self.config.dex_ids.clone() {
205            self.register_dex_exchange(dex).await?;
206            self.sync_exchange_pools(&dex, from_block, None, false)
207                .await?;
208        }
209
210        Ok(())
211    }
212
213    /// Syncs blocks with consistency checks to ensure data integrity.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if block syncing fails or if consistency checks fail.
218    pub async fn sync_blocks_checked(
219        &mut self,
220        from_block: u64,
221        to_block: Option<u64>,
222    ) -> anyhow::Result<()> {
223        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
224            // If blocks are consistent proceed with copy command.
225            if blocks_status.is_consistent() {
226                log::info!(
227                    "Cache is consistent: no gaps detected (last continuous block: {})",
228                    blocks_status.last_continuous_block
229                );
230                let target_block = max(blocks_status.max_block + 1, from_block);
231                log::info!(
232                    "Starting fast sync with COPY from block {}",
233                    target_block.separate_with_commas()
234                );
235                self.sync_blocks(target_block, to_block, true).await?;
236            } else {
237                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
238                log::info!(
239                    "Cache inconsistency detected: {} blocks missing between {} and {}",
240                    gap_size,
241                    blocks_status.last_continuous_block + 1,
242                    blocks_status.max_block
243                );
244
245                log::info!(
246                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
247                    blocks_status.last_continuous_block + 1,
248                    blocks_status.max_block
249                );
250                self.sync_blocks(
251                    blocks_status.last_continuous_block + 1,
252                    Some(blocks_status.max_block),
253                    false,
254                )
255                .await?;
256
257                log::info!(
258                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
259                    (blocks_status.max_block + 1).separate_with_commas()
260                );
261                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
262                    .await?;
263            }
264        } else {
265            self.sync_blocks(from_block, to_block, true).await?;
266        }
267
268        Ok(())
269    }
270
271    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if block fetching, caching, or database operations fail.
276    pub async fn sync_blocks(
277        &mut self,
278        from_block: u64,
279        to_block: Option<u64>,
280        use_copy_command: bool,
281    ) -> anyhow::Result<()> {
282        const BATCH_SIZE: usize = 1000;
283
284        let to_block = if let Some(block) = to_block {
285            block
286        } else {
287            self.hypersync_client.current_block().await
288        };
289        let total_blocks = to_block.saturating_sub(from_block) + 1;
290        log::info!(
291            "Syncing blocks from {} to {} (total: {} blocks)",
292            from_block.separate_with_commas(),
293            to_block.separate_with_commas(),
294            total_blocks.separate_with_commas()
295        );
296
297        // Enable performance settings for sync operations
298        if let Err(e) = self.cache.toggle_performance_settings(true).await {
299            log::warn!("Failed to enable performance settings: {e}");
300        }
301
302        let blocks_stream = self
303            .hypersync_client
304            .request_blocks_stream(from_block, Some(to_block))
305            .await;
306
307        tokio::pin!(blocks_stream);
308
309        let mut metrics = BlockchainSyncReporter::new(
310            BlockchainSyncReportItems::Blocks,
311            from_block,
312            total_blocks,
313            BLOCKS_PROCESS_IN_SYNC_REPORT,
314        );
315
316        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
317
318        let cancellation_token = self.cancellation_token.clone();
319        let sync_result = tokio::select! {
320            () = cancellation_token.cancelled() => {
321                log::info!("Block sync cancelled");
322                Err(anyhow::anyhow!("Sync cancelled"))
323            }
324            result = async {
325                while let Some(block) = blocks_stream.next().await {
326                    let block_number = block.number;
327                    if self.cache.get_block_timestamp(block_number).is_some() {
328                        continue;
329                    }
330                    batch.push(block);
331
332                    // Process batch when full or last block
333                    if batch.len() >= BATCH_SIZE || block_number >= to_block {
334                        let batch_size = batch.len();
335
336                        self.cache.add_blocks_batch(batch, use_copy_command).await?;
337                        metrics.update(batch_size);
338
339                        // Re-initialize batch vector
340                        batch = Vec::with_capacity(BATCH_SIZE);
341                    }
342
343                    // Log progress if needed
344                    if metrics.should_log_progress(block_number, to_block) {
345                        metrics.log_progress(block_number);
346                    }
347                }
348
349                // Process any remaining blocks
350                if !batch.is_empty() {
351                    let batch_size = batch.len();
352                    self.cache.add_blocks_batch(batch, use_copy_command).await?;
353                    metrics.update(batch_size);
354                }
355
356                metrics.log_final_stats();
357                Ok(())
358            } => result
359        };
360
361        sync_result?;
362
363        // Restore default safe settings after sync completion
364        if let Err(e) = self.cache.toggle_performance_settings(false).await {
365            log::warn!("Failed to restore default settings: {e}");
366        }
367
368        Ok(())
369    }
370
371    /// Synchronizes all events for a specific pool within the given block range.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if event syncing, parsing, or database operations fail.
376    pub async fn sync_pool_events(
377        &mut self,
378        dex: &DexType,
379        pool_identifier: PoolIdentifier,
380        from_block: Option<u64>,
381        to_block: Option<u64>,
382        reset: bool,
383    ) -> anyhow::Result<()> {
384        const EVENT_BATCH_SIZE: usize = 20000;
385
386        let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
387        let pool_display = pool.to_full_spec_string();
388        let from_block = from_block.unwrap_or(pool.creation_block);
389        // Extract address for blockchain queries
390        let pool_address = &pool.address;
391
392        let (last_synced_block, effective_from_block) = if reset {
393            (None, from_block)
394        } else {
395            let last_synced_block = self
396                .cache
397                .get_pool_last_synced_block(dex, &pool_identifier)
398                .await?;
399            let effective_from_block = last_synced_block
400                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
401            (last_synced_block, effective_from_block)
402        };
403
404        let to_block = match to_block {
405            Some(block) => block,
406            None => self.hypersync_client.current_block().await,
407        };
408
409        // Skip sync if we're already up to date
410        if effective_from_block > to_block {
411            log::info!(
412                "D {} already synced to block {} (current: {}), skipping sync",
413                dex,
414                last_synced_block.unwrap_or(0).separate_with_commas(),
415                to_block.separate_with_commas()
416            );
417            return Ok(());
418        }
419
420        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
421        let last_block_across_pool_events_table = self
422            .cache
423            .get_pool_event_tables_last_block(&pool_identifier)
424            .await?;
425
426        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
427        log::info!(
428            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
429            pool_display,
430            effective_from_block.separate_with_commas(),
431            to_block.separate_with_commas(),
432            total_blocks.separate_with_commas(),
433            if let Some(last_synced) = last_synced_block {
434                format!(
435                    " - resuming from last synced block {}",
436                    last_synced.separate_with_commas()
437                )
438            } else {
439                String::new()
440            }
441        );
442
443        let mut metrics = BlockchainSyncReporter::new(
444            BlockchainSyncReportItems::PoolEvents,
445            effective_from_block,
446            total_blocks,
447            BLOCKS_PROCESS_IN_SYNC_REPORT,
448        );
449        let dex_extended = self.get_dex_extended(dex)?.clone();
450        let swap_event_signature = dex_extended.swap_created_event.as_ref();
451        let mint_event_signature = dex_extended.mint_created_event.as_ref();
452        let burn_event_signature = dex_extended.burn_created_event.as_ref();
453        let collect_event_signature = dex_extended.collect_created_event.as_ref();
454        let flash_event_signature = dex_extended.flash_created_event.as_ref();
455        let initialize_event_signature: Option<&str> =
456            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
457
458        // Pre-decode event signatures to bytes for efficient comparison
459        let swap_sig_bytes = hex::decode(
460            swap_event_signature
461                .strip_prefix("0x")
462                .unwrap_or(swap_event_signature),
463        )?;
464        let mint_sig_bytes = hex::decode(
465            mint_event_signature
466                .strip_prefix("0x")
467                .unwrap_or(mint_event_signature),
468        )?;
469        let burn_sig_bytes = hex::decode(
470            burn_event_signature
471                .strip_prefix("0x")
472                .unwrap_or(burn_event_signature),
473        )?;
474        let collect_sig_bytes = hex::decode(
475            collect_event_signature
476                .strip_prefix("0x")
477                .unwrap_or(collect_event_signature),
478        )?;
479        let flash_sig_bytes = flash_event_signature
480            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481        let initialize_sig_bytes = initialize_event_signature
482            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
483
484        let mut event_signatures = vec![
485            swap_event_signature,
486            mint_event_signature,
487            burn_event_signature,
488            collect_event_signature,
489        ];
490        if let Some(event) = dex_extended.initialize_event.as_ref() {
491            event_signatures.push(event);
492        }
493        if let Some(event) = dex_extended.flash_created_event.as_ref() {
494            event_signatures.push(event);
495        }
496        let pool_events_stream = self
497            .hypersync_client
498            .request_contract_events_stream(
499                effective_from_block,
500                Some(to_block),
501                pool_address,
502                event_signatures,
503            )
504            .await;
505        tokio::pin!(pool_events_stream);
506
507        let mut last_block_saved = effective_from_block;
508        let mut blocks_processed = 0;
509
510        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515        // Track when we've moved beyond stale data and can use COPY
516        let mut beyond_stale_data = last_block_across_pool_events_table
517            .is_none_or(|tables_max| effective_from_block > tables_max);
518
519        let cancellation_token = self.cancellation_token.clone();
520        let sync_result = tokio::select! {
521            () = cancellation_token.cancelled() => {
522                log::info!("Pool event sync cancelled");
523                Err(anyhow::anyhow!("Sync cancelled"))
524            }
525            result = async {
526                while let Some(log) = pool_events_stream.next().await {
527                    let block_number = extract_block_number(&log)?;
528                    blocks_processed += block_number - last_block_saved;
529                    last_block_saved = block_number;
530
531                    let event_sig_bytes = extract_event_signature_bytes(&log)?;
532            if event_sig_bytes == swap_sig_bytes.as_slice() {
533                let swap_event = dex_extended.parse_swap_event_hypersync(log)?;
534                match self.process_pool_swap_event(&swap_event, &pool) {
535                    Ok(swap) => swap_batch.push(swap),
536                    Err(e) => log::error!("Failed to process swap event: {e}"),
537                }
538            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539                let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
540                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542                    Err(e) => log::error!("Failed to process mint event: {e}"),
543                }
544            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545                let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
546                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548                    Err(e) => log::error!("Failed to process burn event: {e}"),
549                }
550            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551                let collect_event = dex_extended.parse_collect_event_hypersync(log)?;
552                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553                    Ok(fee_collect) => collect_batch.push(fee_collect),
554                    Err(e) => log::error!("Failed to process collect event: {e}"),
555                }
556            } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557                let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
558                self.cache
559                    .update_pool_initialize_price_tick(&initialize_event)
560                    .await?;
561            } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562                if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
563                    match parse_fn(dex_extended.dex.clone(), log) {
564                        Ok(flash_event) => {
565                            match self.process_pool_flash_event(&flash_event, &pool) {
566                                Ok(flash) => flash_batch.push(flash),
567                                Err(e) => log::error!("Failed to process flash event: {e}"),
568                            }
569                        }
570                        Err(e) => log::error!("Failed to parse flash event: {e}"),
571                    }
572                }
573            } else {
574                let event_signature = hex::encode(event_sig_bytes);
575                log::error!(
576                    "Unexpected event signature: {event_signature} for log {log:?}"
577                );
578            }
579
580            // Check if we've moved beyond stale data (transition point for strategy change)
581            if !beyond_stale_data
582                && last_block_across_pool_events_table
583                    .is_some_and(|table_max| block_number > table_max)
584            {
585                log::info!(
586                    "Crossed beyond stale data at block {block_number} - flushing current batches with ON CONFLICT, then switching to COPY"
587                );
588
589                // Flush all batches with ON CONFLICT to handle any remaining duplicates
590                self.flush_event_batches(
591                    EVENT_BATCH_SIZE,
592                    &mut swap_batch,
593                    &mut liquidity_batch,
594                    &mut collect_batch,
595                    &mut flash_batch,
596                    false,
597                    true,
598                )
599                .await?;
600
601                beyond_stale_data = true;
602                log::info!("Switched to COPY mode - future batches will use COPY command");
603            } else {
604                // Process batches when they reach batch size
605                self.flush_event_batches(
606                    EVENT_BATCH_SIZE,
607                    &mut swap_batch,
608                    &mut liquidity_batch,
609                    &mut collect_batch,
610                    &mut flash_batch,
611                    false, // TODO temporary dont use copy command
612                    false,
613                )
614                .await?;
615            }
616
617            metrics.update(blocks_processed as usize);
618            blocks_processed = 0;
619
620            // Log progress if needed
621            if metrics.should_log_progress(block_number, to_block) {
622                metrics.log_progress(block_number);
623                self.cache
624                    .update_pool_last_synced_block(dex, &pool_identifier, block_number)
625                    .await?;
626            }
627        }
628
629        self.flush_event_batches(
630            EVENT_BATCH_SIZE,
631            &mut swap_batch,
632            &mut liquidity_batch,
633            &mut collect_batch,
634            &mut flash_batch,
635            false,
636            true,
637        )
638        .await?;
639
640        metrics.log_final_stats();
641        self.cache
642            .update_pool_last_synced_block(dex, &pool_identifier, to_block)
643            .await?;
644
645        log::info!(
646            "Successfully synced Dex '{}' Pool '{}' up to block {}",
647            dex,
648            pool_display,
649            to_block.separate_with_commas()
650        );
651                Ok(())
652            } => result
653        };
654
655        sync_result
656    }
657
658    #[allow(clippy::too_many_arguments)]
659    async fn flush_event_batches(
660        &mut self,
661        event_batch_size: usize,
662        swap_batch: &mut Vec<PoolSwap>,
663        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
664        collect_batch: &mut Vec<PoolFeeCollect>,
665        flash_batch: &mut Vec<PoolFlash>,
666        use_copy_command: bool,
667        force_flush_all: bool,
668    ) -> anyhow::Result<()> {
669        if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
670            self.cache
671                .add_pool_swaps_batch(swap_batch, use_copy_command)
672                .await?;
673            swap_batch.clear();
674        }
675        if (force_flush_all || liquidity_batch.len() >= event_batch_size)
676            && !liquidity_batch.is_empty()
677        {
678            self.cache
679                .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
680                .await?;
681            liquidity_batch.clear();
682        }
683        if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
684        {
685            self.cache
686                .add_pool_fee_collects_batch(collect_batch, use_copy_command)
687                .await?;
688            collect_batch.clear();
689        }
690        if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
691            self.cache.add_pool_flash_batch(flash_batch).await?;
692            flash_batch.clear();
693        }
694        Ok(())
695    }
696
697    /// Processes a swap event and converts it to a pool swap.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if swap event processing fails.
702    ///
703    /// # Panics
704    ///
705    /// Panics if swap event conversion to trade data fails.
706    pub fn process_pool_swap_event(
707        &self,
708        swap_event: &SwapEvent,
709        pool: &SharedPool,
710    ) -> anyhow::Result<PoolSwap> {
711        let timestamp = self
712            .cache
713            .get_block_timestamp(swap_event.block_number)
714            .copied();
715        let mut swap = swap_event.to_pool_swap(
716            self.chain.clone(),
717            pool.instrument_id,
718            pool.pool_identifier,
719            timestamp,
720        );
721        swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
722
723        Ok(swap)
724    }
725
726    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
731    pub fn process_pool_mint_event(
732        &self,
733        mint_event: &MintEvent,
734        pool: &SharedPool,
735        dex_extended: &DexExtended,
736    ) -> anyhow::Result<PoolLiquidityUpdate> {
737        let timestamp = self
738            .cache
739            .get_block_timestamp(mint_event.block_number)
740            .copied();
741
742        let liquidity_update = mint_event.to_pool_liquidity_update(
743            self.chain.clone(),
744            dex_extended.dex.clone(),
745            pool.instrument_id,
746            timestamp,
747        );
748
749        // self.cache.add_liquidity_update(&liquidity_update).await?;
750
751        Ok(liquidity_update)
752    }
753
754    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
755    /// Processes a pool burn event and converts it to a liquidity update.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
760    pub fn process_pool_burn_event(
761        &self,
762        burn_event: &BurnEvent,
763        pool: &SharedPool,
764        dex_extended: &DexExtended,
765    ) -> anyhow::Result<PoolLiquidityUpdate> {
766        let timestamp = self
767            .cache
768            .get_block_timestamp(burn_event.block_number)
769            .copied();
770
771        let liquidity_update = burn_event.to_pool_liquidity_update(
772            self.chain.clone(),
773            dex_extended.dex.clone(),
774            pool.instrument_id,
775            pool.pool_identifier,
776            timestamp,
777        );
778
779        // self.cache.add_liquidity_update(&liquidity_update).await?;
780
781        Ok(liquidity_update)
782    }
783
784    /// Processes a pool collect event and converts it to a fee collection.
785    ///
786    /// # Errors
787    ///
788    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
789    pub fn process_pool_collect_event(
790        &self,
791        collect_event: &CollectEvent,
792        pool: &SharedPool,
793        dex_extended: &DexExtended,
794    ) -> anyhow::Result<PoolFeeCollect> {
795        let timestamp = self
796            .cache
797            .get_block_timestamp(collect_event.block_number)
798            .copied();
799
800        let fee_collect = collect_event.to_pool_fee_collect(
801            self.chain.clone(),
802            dex_extended.dex.clone(),
803            pool.instrument_id,
804            timestamp,
805        );
806
807        Ok(fee_collect)
808    }
809
810    /// Processes a pool flash event and converts it to a flash loan.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
815    pub fn process_pool_flash_event(
816        &self,
817        flash_event: &FlashEvent,
818        pool: &SharedPool,
819    ) -> anyhow::Result<PoolFlash> {
820        let timestamp = self
821            .cache
822            .get_block_timestamp(flash_event.block_number)
823            .copied();
824
825        let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
826
827        Ok(flash)
828    }
829
830    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
831    ///
832    /// This method performs a comprehensive sync of:
833    /// 1. Pool creation events from the DEX factory
834    /// 2. Token metadata for all tokens in discovered pools
835    /// 3. Pool entities with proper token associations
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if syncing pools, tokens, or DEX operations fail.
840    pub async fn sync_exchange_pools(
841        &mut self,
842        dex: &DexType,
843        from_block: u64,
844        to_block: Option<u64>,
845        reset: bool,
846    ) -> anyhow::Result<()> {
847        let dex_extended = self.get_dex_extended(dex)?.clone();
848
849        let mut service = PoolDiscoveryService::new(
850            self.chain.clone(),
851            &mut self.cache,
852            &self.tokens,
853            &self.hypersync_client,
854            self.cancellation_token.clone(),
855            self.config.clone(),
856        );
857
858        service
859            .sync_pools(&dex_extended, from_block, to_block, reset)
860            .await?;
861
862        Ok(())
863    }
864
865    /// Registers a decentralized exchange for data collection and event monitoring.
866    ///
867    /// Registration involves:
868    /// 1. Adding the DEX to the cache
869    /// 2. Loading existing pools for the DEX
870    /// 3. Configuring event signatures for subscriptions
871    ///
872    /// # Errors
873    ///
874    /// Returns an error if DEX registration, cache operations, or pool loading fails.
875    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
876        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
877            log::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
878
879            self.cache.add_dex(dex_extended.dex.clone()).await?;
880            let _ = self.cache.load_pools(&dex_id).await?;
881
882            self.subscription_manager.register_dex_for_subscriptions(
883                dex_id,
884                dex_extended.swap_created_event.as_ref(),
885                dex_extended.mint_created_event.as_ref(),
886                dex_extended.burn_created_event.as_ref(),
887                dex_extended.collect_created_event.as_ref(),
888                dex_extended.flash_created_event.as_deref(),
889            );
890            Ok(())
891        } else {
892            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
893        }
894    }
895
896    /// Bootstraps a [`PoolProfiler`] with the latest state for a given pool.
897    ///
898    /// Uses two paths depending on whether the pool has been synced to the database:
899    /// - **Never synced**: Streams events from HyperSync → restores from on-chain RPC → returns `(profiler, true)`
900    /// - **Previously synced**: Syncs new events to DB → streams from DB → returns `(profiler, false)`
901    ///
902    /// Both paths restore from the latest valid snapshot first (if available), otherwise initialize with pool's initial price.
903    ///
904    /// # Returns
905    ///
906    /// - `PoolProfiler`: Hydrated profiler with current pool state
907    /// - `bool`: `true` if constructed from RPC (already valid), `false` if from DB (needs validation)
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if database is not initialized or event processing fails.
912    ///
913    /// # Panics
914    ///
915    /// Panics if the database reference is unavailable.
916    pub async fn bootstrap_latest_pool_profiler(
917        &mut self,
918        pool: &SharedPool,
919    ) -> anyhow::Result<(PoolProfiler, bool)> {
920        log::info!(
921            "Bootstrapping latest pool profiler for pool {}",
922            pool.address
923        );
924
925        if self.cache.database.is_none() {
926            anyhow::bail!(
927                "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
928            );
929        }
930
931        let mut profiler = PoolProfiler::new(pool.clone());
932
933        // Calculate latest valid block position after which we need to start profiling.
934        let from_position = match self
935            .cache
936            .database
937            .as_ref()
938            .unwrap()
939            .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
940            .await
941        {
942            Ok(Some(snapshot)) => {
943                log::info!(
944                    "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
945                    snapshot.block_position.number.separate_with_commas(),
946                    snapshot.positions.len(),
947                    snapshot.ticks.len()
948                );
949                let block_position = snapshot.block_position.clone();
950                profiler.restore_from_snapshot(snapshot)?;
951                log::info!("Restored profiler from snapshot");
952                Some(block_position)
953            }
954            _ => {
955                log::info!("No valid snapshot found, processing from beginning");
956                None
957            }
958        };
959
960        // If we don't have never synced pool events, proceed with faster
961        // construction of pool profiler from hypersync and RPC, where we
962        // dont need syncing of pool events and fetching it from database
963        if self
964            .cache
965            .database
966            .as_ref()
967            .unwrap()
968            .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
969            .await?
970            .is_none()
971        {
972            return self
973                .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
974                .await;
975        }
976
977        // Sync the pool events before bootstrapping of pool profiler
978        if let Err(e) = self
979            .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
980            .await
981        {
982            log::error!("Failed to sync pool events for snapshot request: {e}");
983        }
984
985        if !profiler.is_initialized {
986            if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
987                profiler.initialize(initial_sqrt_price_x96);
988            } else {
989                anyhow::bail!(
990                    "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
991                );
992            }
993        }
994
995        let from_block = from_position
996            .as_ref()
997            .map_or(profiler.pool.creation_block, |block_position| {
998                block_position.number
999            });
1000        let to_block = self.hypersync_client.current_block().await;
1001        let total_blocks = to_block.saturating_sub(from_block) + 1;
1002
1003        // Enable embedded profiler reporting
1004        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1005
1006        let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1007            pool.chain.clone(),
1008            pool.dex.clone(),
1009            pool.instrument_id,
1010            pool.pool_identifier,
1011            from_position.clone(),
1012        );
1013
1014        while let Some(result) = stream.next().await {
1015            match result {
1016                Ok(event) => {
1017                    profiler.process(&event)?;
1018                }
1019                Err(e) => log::error!("Error processing event: {e}"),
1020            }
1021        }
1022
1023        profiler.finalize_reporting();
1024
1025        Ok((profiler, false))
1026    }
1027
1028    /// Constructs a pool profiler by fetching events directly from HyperSync RPC.
1029    ///
1030    /// This method is used when the pool has never been synced to the database. It streams
1031    /// liquidity events (mints, burns) directly from HyperSync and processes them
1032    /// to build up the profiler's state in real-time. After processing all events, it
1033    /// restores the profiler from the current on-chain state with the provided ticks and positions
1034    ///
1035    /// # Returns
1036    ///
1037    /// Returns a tuple of:
1038    /// - `PoolProfiler`: The hydrated profiler with state built from events
1039    /// - `bool`: Always `true` to indicate the profiler state was valid, and it was constructed from RPC
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns an error if:
1044    /// - Event streaming from HyperSync fails
1045    /// - Event parsing or processing fails
1046    /// - DEX configuration is invalid
1047    async fn construct_pool_profiler_from_hypersync_rpc(
1048        &self,
1049        mut profiler: PoolProfiler,
1050        from_position: Option<BlockPosition>,
1051    ) -> anyhow::Result<(PoolProfiler, bool)> {
1052        log::info!("Constructing pool profiler from hypersync stream and RPC final state querying");
1053        let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1054        let mint_event_signature = dex_extended.mint_created_event.as_ref();
1055        let burn_event_signature = dex_extended.burn_created_event.as_ref();
1056        let initialize_event_signature =
1057            if let Some(initialize_event) = &dex_extended.initialize_event {
1058                initialize_event.as_ref()
1059            } else {
1060                anyhow::bail!(
1061                    "DEX {} does not have initialize event set.",
1062                    &profiler.pool.dex.name
1063                );
1064            };
1065        let mint_sig_bytes = hex::decode(
1066            mint_event_signature
1067                .strip_prefix("0x")
1068                .unwrap_or(mint_event_signature),
1069        )?;
1070        let burn_sig_bytes = hex::decode(
1071            burn_event_signature
1072                .strip_prefix("0x")
1073                .unwrap_or(burn_event_signature),
1074        )?;
1075        let initialize_sig_bytes = hex::decode(
1076            initialize_event_signature
1077                .strip_prefix("0x")
1078                .unwrap_or(initialize_event_signature),
1079        )?;
1080
1081        let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1082            block_position.number
1083        });
1084        let to_block = self.hypersync_client.current_block().await;
1085        let total_blocks = to_block.saturating_sub(from_block) + 1;
1086
1087        log::info!(
1088            "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1089            profiler.pool.address,
1090            from_block.separate_with_commas(),
1091            to_block.separate_with_commas(),
1092            total_blocks.separate_with_commas()
1093        );
1094
1095        // Enable embedded profiler reporting
1096        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1097
1098        let pool_events_stream = self
1099            .hypersync_client
1100            .request_contract_events_stream(
1101                from_block,
1102                None,
1103                &profiler.pool.address,
1104                vec![
1105                    mint_event_signature,
1106                    burn_event_signature,
1107                    initialize_event_signature,
1108                ],
1109            )
1110            .await;
1111        tokio::pin!(pool_events_stream);
1112
1113        while let Some(log) = pool_events_stream.next().await {
1114            let event_sig_bytes = extract_event_signature_bytes(&log)?;
1115
1116            if event_sig_bytes == initialize_sig_bytes {
1117                let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
1118                profiler.initialize(initialize_event.sqrt_price_x96);
1119                self.cache
1120                    .database
1121                    .as_ref()
1122                    .unwrap()
1123                    .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1124                    .await?;
1125            } else if event_sig_bytes == mint_sig_bytes {
1126                let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
1127                match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1128                    Ok(liquidity_update) => {
1129                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1130                    }
1131                    Err(e) => log::error!("Failed to process mint event: {e}"),
1132                }
1133            } else if event_sig_bytes == burn_sig_bytes {
1134                let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
1135                match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1136                    Ok(liquidity_update) => {
1137                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1138                    }
1139                    Err(e) => log::error!("Failed to process burn event: {e}"),
1140                }
1141            } else {
1142                let event_signature = hex::encode(event_sig_bytes);
1143                log::error!(
1144                    "Unexpected event signature in bootstrap_latest_pool_profiler: {event_signature} for log {log:?}"
1145                );
1146            }
1147        }
1148
1149        profiler.finalize_reporting();
1150
1151        // Hydrate from the current RPC state
1152        match self.get_on_chain_snapshot(&profiler).await {
1153            Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1154            Err(e) => log::error!(
1155                "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1156            ),
1157        }
1158
1159        Ok((profiler, true))
1160    }
1161
1162    /// Validates a pool profiler's state against on-chain data for accuracy verification.
1163    ///
1164    /// This method performs integrity checking by comparing the profiler's internal state
1165    /// (positions, ticks, liquidity) with the actual on-chain smart contract state. For UniswapV3
1166    /// pools, it fetches current on-chain data and verifies that the profiler's tracked state matches.
1167    /// If validation succeeds or is bypassed, the snapshot is marked as valid in the database.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if database operations fail when marking the snapshot as valid.
1172    ///
1173    /// # Panics
1174    ///
1175    /// Panics if the profiler does not have a last_processed_event when already_validated is true.
1176    pub async fn check_snapshot_validity(
1177        &self,
1178        profiler: &PoolProfiler,
1179        already_validated: bool,
1180    ) -> anyhow::Result<bool> {
1181        // Determine validity and get block position for marking
1182        let (is_valid, block_position) = if already_validated {
1183            // Skip RPC call - profiler was validated during construction from RPC
1184            log::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1185            let last_event = profiler
1186                .last_processed_event
1187                .clone()
1188                .expect("Profiler should have last_processed_event");
1189            (true, last_event)
1190        } else {
1191            // Fetch on-chain state and compare
1192            match self.get_on_chain_snapshot(profiler).await {
1193                Ok(on_chain_snapshot) => {
1194                    log::info!("Comparing profiler state with on-chain state...");
1195                    let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1196                    if !valid {
1197                        log::error!(
1198                            "Pool profiler state does NOT match on-chain smart contract state"
1199                        );
1200                    }
1201                    (valid, on_chain_snapshot.block_position)
1202                }
1203                Err(e) => {
1204                    log::error!("Failed to check snapshot validity: {e}");
1205                    return Ok(false);
1206                }
1207            }
1208        };
1209
1210        // Mark snapshot as valid in database if validation passed
1211        if is_valid && let Some(cache_database) = &self.cache.database {
1212            cache_database
1213                .mark_pool_snapshot_valid(
1214                    profiler.pool.chain.chain_id,
1215                    &profiler.pool.pool_identifier,
1216                    block_position.number,
1217                    block_position.transaction_index,
1218                    block_position.log_index,
1219                )
1220                .await?;
1221            log::info!("Marked pool profiler snapshot as valid");
1222        }
1223
1224        Ok(is_valid)
1225    }
1226
1227    /// Fetches current on-chain pool state at the last processed block.
1228    ///
1229    /// Queries the pool smart contract to retrieve active tick liquidity and position data,
1230    /// using the profiler's active positions and last processed block number.
1231    /// Used for profiler state restoration after bootstrapping and validation.
1232    async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1233        if profiler.pool.dex.name == DexType::UniswapV3 {
1234            let last_processed_event = profiler
1235                .last_processed_event
1236                .clone()
1237                .expect("We expect at least one processed event in the pool");
1238            let on_chain_snapshot = self
1239                .univ3_pool
1240                .fetch_snapshot(
1241                    &profiler.pool.address,
1242                    profiler.pool.instrument_id,
1243                    profiler.get_active_tick_values().as_slice(),
1244                    &profiler.get_all_position_keys(),
1245                    last_processed_event,
1246                )
1247                .await?;
1248
1249            Ok(on_chain_snapshot)
1250        } else {
1251            anyhow::bail!(
1252                "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1253                profiler.pool.dex.name
1254            )
1255        }
1256    }
1257
1258    /// Replays historical events for a pool to hydrate its profiler state.
1259    ///
1260    /// Streams all historical swap, liquidity, and fee collect events from the database
1261    /// and sends them through the normal data event pipeline to build up pool profiler state.
1262    ///
1263    /// # Errors
1264    ///
1265    /// Returns an error if database streaming fails or event processing fails.
1266    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1267        if let Some(database) = &self.cache.database {
1268            log::info!(
1269                "Replaying historical events for pool {} to hydrate profiler",
1270                pool.instrument_id
1271            );
1272
1273            let mut event_stream = database.stream_pool_events(
1274                self.chain.clone(),
1275                dex.clone(),
1276                pool.instrument_id,
1277                pool.pool_identifier,
1278                None,
1279            );
1280            let mut event_count = 0;
1281
1282            while let Some(event_result) = event_stream.next().await {
1283                match event_result {
1284                    Ok(event) => {
1285                        let data_event = match event {
1286                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1287                            DexPoolData::LiquidityUpdate(update) => {
1288                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1289                            }
1290                            DexPoolData::FeeCollect(collect) => {
1291                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1292                            }
1293                            DexPoolData::Flash(flash) => {
1294                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1295                            }
1296                        };
1297                        self.send_data(data_event);
1298                        event_count += 1;
1299                    }
1300                    Err(e) => {
1301                        log::error!("Error streaming event for pool {}: {e}", pool.instrument_id);
1302                    }
1303                }
1304            }
1305
1306            log::info!(
1307                "Replayed {event_count} historical events for pool {}",
1308                pool.instrument_id
1309            );
1310        } else {
1311            log::debug!(
1312                "No database available, skipping event replay for pool {}",
1313                pool.instrument_id
1314            );
1315        }
1316
1317        Ok(())
1318    }
1319
1320    /// Determines the starting block for syncing operations.
1321    fn determine_from_block(&self) -> u64 {
1322        self.config
1323            .from_block
1324            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1325    }
1326
1327    /// Retrieves extended DEX information for a registered DEX.
1328    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1329        if !self.cache.get_registered_dexes().contains(dex_id) {
1330            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1331        }
1332
1333        match get_dex_extended(self.chain.name, dex_id) {
1334            Some(dex) => Ok(dex),
1335            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1336        }
1337    }
1338
1339    /// Retrieves a pool from the cache by its address.
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns an error if the pool is not registered in the cache.
1344    pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1345        match self.cache.get_pool(pool_identifier) {
1346            Some(pool) => Ok(pool),
1347            None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1348        }
1349    }
1350
1351    /// Sends a data event to all subscribers through the data channel.
1352    pub fn send_data(&self, data: DataEvent) {
1353        if let Some(data_tx) = &self.data_tx {
1354            log::debug!("Sending {data}");
1355
1356            if let Err(e) = data_tx.send(data) {
1357                log::error!("Failed to send data: {e}");
1358            }
1359        } else {
1360            log::error!("No data event channel for sending data");
1361        }
1362    }
1363
1364    /// Disconnects all active connections and cleanup resources.
1365    ///
1366    /// This method should be called when shutting down the client to ensure
1367    /// proper cleanup of network connections and background tasks.
1368    pub async fn disconnect(&mut self) {
1369        self.hypersync_client.disconnect().await;
1370    }
1371}