nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap, HashSet},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
31    SharedPool, Token,
32    data::{PoolFeeCollect, PoolFlash},
33    pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
34    tick_map::tick::PoolTick,
35};
36use sqlx::postgres::PgConnectOptions;
37
38use crate::{
39    cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
40    events::initialize::InitializeEvent,
41};
42
43pub mod consistency;
44pub mod copy;
45pub mod database;
46pub mod rows;
47pub mod types;
48
49/// Provides caching functionality for various blockchain domain objects.
50#[derive(Debug)]
51pub struct BlockchainCache {
52    /// The blockchain chain this cache is associated with.
53    chain: SharedChain,
54    /// Map of block numbers to their corresponding timestamp
55    block_timestamps: BTreeMap<u64, UnixNanos>,
56    /// Map of DEX identifiers to their corresponding DEX objects.
57    dexes: HashMap<DexType, SharedDex>,
58    /// Map of token addresses to their corresponding `Token` objects.
59    tokens: HashMap<Address, Token>,
60    /// Cached set of invalid token addresses that failed validation or processing.
61    invalid_tokens: HashSet<Address>,
62    /// Map of pool identifiers to their corresponding `Pool` objects.
63    pools: HashMap<PoolIdentifier, SharedPool>,
64    /// Optional database connection for persistent storage.
65    pub database: Option<BlockchainCacheDatabase>,
66}
67
68impl BlockchainCache {
69    /// Creates a new in-memory blockchain cache for the specified chain.
70    #[must_use]
71    pub fn new(chain: SharedChain) -> Self {
72        Self {
73            chain,
74            dexes: HashMap::new(),
75            tokens: HashMap::new(),
76            invalid_tokens: HashSet::new(),
77            pools: HashMap::new(),
78            block_timestamps: BTreeMap::new(),
79            database: None,
80        }
81    }
82
83    /// Returns the highest continuous block number currently cached, if any.
84    pub async fn get_cache_block_consistency_status(
85        &self,
86    ) -> Option<CachedBlocksConsistencyStatus> {
87        let database = self.database.as_ref()?;
88        database
89            .get_block_consistency_status(&self.chain)
90            .await
91            .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
92            .ok()
93    }
94
95    /// Returns the earliest block number where any DEX in the cache was created on the blockchain.
96    #[must_use]
97    pub fn min_dex_creation_block(&self) -> Option<u64> {
98        self.dexes
99            .values()
100            .map(|dex| dex.factory_creation_block)
101            .min()
102    }
103
104    /// Returns the timestamp for the specified block number if it exists in the cache.
105    #[must_use]
106    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
107        self.block_timestamps.get(&block_number)
108    }
109
110    /// Initializes the database connection for persistent storage.
111    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
112        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
113        self.database = Some(database);
114    }
115
116    /// Toggles performance optimization settings in the database.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the database is not initialized or the operation fails.
121    pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
122        if let Some(database) = &self.database {
123            database.toggle_perf_sync_settings(enable).await
124        } else {
125            tracing::warn!("Database not initialized, skipping performance settings toggle");
126            Ok(())
127        }
128    }
129
130    /// Initializes the chain by seeding it in the database and creating necessary partitions.
131    ///
132    /// This method sets up the blockchain chain in the database, creates block and token
133    /// partitions for optimal performance, and loads existing tokens into the cache.
134    pub async fn initialize_chain(&mut self) {
135        // Seed target adapter chain in database
136        if let Some(database) = &self.database {
137            if let Err(e) = database.seed_chain(&self.chain).await {
138                tracing::error!(
139                    "Error seeding chain in database: {e}. Continuing without database cache functionality"
140                );
141                return;
142            }
143            tracing::info!("Chain seeded in the database");
144
145            match database.create_block_partition(&self.chain).await {
146                Ok(message) => tracing::info!("Executing block partition creation: {}", message),
147                Err(e) => tracing::error!(
148                    "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
149                    self.chain.chain_id
150                ),
151            }
152
153            match database.create_token_partition(&self.chain).await {
154                Ok(message) => tracing::info!("Executing token partition creation: {}", message),
155                Err(e) => tracing::error!(
156                    "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
157                    self.chain.chain_id
158                ),
159            }
160        }
161
162        if let Err(e) = self.load_tokens().await {
163            tracing::error!("Error loading tokens from the database: {e}");
164        }
165    }
166
167    /// Connects to the database and loads initial data.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if database seeding, token loading, or block loading fails.
172    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
173        tracing::debug!("Connecting and loading from_block {from_block}");
174
175        if let Err(e) = self.load_tokens().await {
176            tracing::error!("Error loading tokens from the database: {e}");
177        }
178
179        // TODO disable block syncing for now as we don't have timestamps yet configured
180        // if let Err(e) = self.load_blocks(from_block).await {
181        //     log::error!("Error loading blocks from database: {e}");
182        // }
183
184        Ok(())
185    }
186
187    /// Loads tokens from the database into the in-memory cache.
188    async fn load_tokens(&mut self) -> anyhow::Result<()> {
189        if let Some(database) = &self.database {
190            let (tokens, invalid_tokens) = tokio::try_join!(
191                database.load_tokens(self.chain.clone()),
192                database.load_invalid_token_addresses(self.chain.chain_id)
193            )?;
194
195            tracing::info!(
196                "Loading {} valid tokens and {} invalid tokens from cache database",
197                tokens.len(),
198                invalid_tokens.len()
199            );
200
201            self.tokens
202                .extend(tokens.into_iter().map(|token| (token.address, token)));
203            self.invalid_tokens.extend(invalid_tokens);
204        }
205        Ok(())
206    }
207
208    /// Loads DEX exchange pools from the database into the in-memory cache.
209    ///
210    /// Returns the loaded pools.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the DEX has not been registered or if database operations fail.
215    pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
216        let mut loaded_pools = Vec::new();
217
218        if let Some(database) = &self.database {
219            let dex = self
220                .get_dex(dex_id)
221                .ok_or_else(|| anyhow::anyhow!("DEX {dex_id:?} has not been registered"))?;
222            let pool_rows = database
223                .load_pools(self.chain.clone(), &dex_id.to_string())
224                .await?;
225            tracing::info!(
226                "Loading {} pools for DEX {} from cache database",
227                pool_rows.len(),
228                dex_id,
229            );
230
231            for pool_row in pool_rows {
232                let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
233                    token
234                } else {
235                    tracing::error!(
236                        "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
237                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
238                        pool_row.address,
239                        dex_id,
240                        pool_row.token0_address
241                    );
242                    continue;
243                };
244
245                let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
246                    token
247                } else {
248                    tracing::error!(
249                        "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
250                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
251                        pool_row.address,
252                        dex_id,
253                        pool_row.token1_address
254                    );
255                    continue;
256                };
257
258                // Construct pool from row data and cached tokens
259                let Some(pool_identifier) = pool_row.pool_identifier.parse().ok() else {
260                    tracing::error!(
261                        "Invalid pool identifier '{}' in database for pool {}, skipping",
262                        pool_row.pool_identifier,
263                        pool_row.address
264                    );
265                    continue;
266                };
267                let mut pool = Pool::new(
268                    self.chain.clone(),
269                    dex.clone(),
270                    pool_row.address,
271                    pool_identifier,
272                    pool_row.creation_block as u64,
273                    token0.clone(),
274                    token1.clone(),
275                    pool_row.fee.map(|fee| fee as u32),
276                    pool_row
277                        .tick_spacing
278                        .map(|tick_spacing| tick_spacing as u32),
279                    UnixNanos::default(), // TODO use default for now
280                );
281
282                // Set hooks if available
283                if let Some(ref hook_address_str) = pool_row.hook_address
284                    && let Ok(hooks) = hook_address_str.parse()
285                {
286                    pool.set_hooks(hooks);
287                }
288
289                // Initialize pool with initial values if available
290                if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96
291                    && let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse()
292                    && let Some(initial_tick) = pool_row.initial_tick
293                {
294                    pool.initialize(initial_sqrt_price_x96, initial_tick);
295                }
296
297                // Add pool to cache and loaded pools list
298                loaded_pools.push(pool.clone());
299                self.pools.insert(pool.pool_identifier, Arc::new(pool));
300            }
301        }
302        Ok(loaded_pools)
303    }
304
305    /// Loads block timestamps from the database starting `from_block` number
306    /// into the in-memory cache.
307    #[allow(dead_code)]
308    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
309        if let Some(database) = &self.database {
310            let block_timestamps = database
311                .load_block_timestamps(self.chain.clone(), from_block)
312                .await?;
313
314            // Verify block number sequence consistency
315            if !block_timestamps.is_empty() {
316                let first = block_timestamps.first().unwrap().number;
317                let last = block_timestamps.last().unwrap().number;
318                let expected_len = (last - first + 1) as usize;
319                if block_timestamps.len() != expected_len {
320                    anyhow::bail!(
321                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
322                        block_timestamps.len()
323                    );
324                }
325            }
326
327            if block_timestamps.is_empty() {
328                tracing::info!("No blocks found in database");
329                return Ok(());
330            }
331
332            tracing::info!(
333                "Loading {} blocks timestamps from the cache database with last block number {}",
334                block_timestamps.len(),
335                block_timestamps.last().unwrap().number,
336            );
337            for block in block_timestamps {
338                self.block_timestamps.insert(block.number, block.timestamp);
339            }
340        }
341        Ok(())
342    }
343
344    /// Adds a block to the cache and persists it to the database if available.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if adding the block to the database fails.
349    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
350        if let Some(database) = &self.database {
351            database.add_block(self.chain.chain_id, &block).await?;
352        }
353        self.block_timestamps.insert(block.number, block.timestamp);
354        Ok(())
355    }
356
357    /// Adds multiple blocks to the cache and persists them to the database in batch if available.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if adding the blocks to the database fails.
362    pub async fn add_blocks_batch(
363        &mut self,
364        blocks: Vec<Block>,
365        use_copy_command: bool,
366    ) -> anyhow::Result<()> {
367        if blocks.is_empty() {
368            return Ok(());
369        }
370
371        if let Some(database) = &self.database {
372            if use_copy_command {
373                database
374                    .add_blocks_copy(self.chain.chain_id, &blocks)
375                    .await?;
376            } else {
377                database
378                    .add_blocks_batch(self.chain.chain_id, &blocks)
379                    .await?;
380            }
381        }
382
383        // Update in-memory cache
384        for block in blocks {
385            self.block_timestamps.insert(block.number, block.timestamp);
386        }
387
388        Ok(())
389    }
390
391    /// Adds a DEX to the cache with the specified identifier.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if adding the DEX to the database fails.
396    pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
397        tracing::info!("Adding dex {} to the cache", dex.name);
398
399        if let Some(database) = &self.database {
400            database.add_dex(dex.clone()).await?;
401        }
402
403        self.dexes.insert(dex.name, dex);
404        Ok(())
405    }
406
407    /// Adds a liquidity pool/pair to the cache.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if adding the pool to the database fails.
412    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
413        if let Some(database) = &self.database {
414            database.add_pool(&pool).await?;
415        }
416
417        self.pools.insert(pool.pool_identifier, Arc::new(pool));
418        Ok(())
419    }
420
421    /// Adds multiple pools to the cache and persists them to the database in batch if available.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if adding the pools to the database fails.
426    pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
427        if pools.is_empty() {
428            return Ok(());
429        }
430
431        if let Some(database) = &self.database {
432            database.add_pools_copy(self.chain.chain_id, &pools).await?;
433        }
434        self.pools.extend(
435            pools
436                .into_iter()
437                .map(|pool| (pool.pool_identifier, Arc::new(pool))),
438        );
439
440        Ok(())
441    }
442
443    /// Adds a [`Token`] to the cache.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if adding the token to the database fails.
448    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
449        if let Some(database) = &self.database {
450            database.add_token(&token).await?;
451        }
452        self.tokens.insert(token.address, token);
453        Ok(())
454    }
455
456    /// Adds multiple tokens to the cache and persists them to the database in batch if available.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if adding the tokens to the database fails.
461    pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
462        if tokens.is_empty() {
463            return Ok(());
464        }
465
466        if let Some(database) = &self.database {
467            database
468                .add_tokens_copy(self.chain.chain_id, &tokens)
469                .await?;
470        }
471
472        self.tokens
473            .extend(tokens.into_iter().map(|token| (token.address, token)));
474
475        Ok(())
476    }
477
478    /// Updates the in-memory token cache without persisting to the database.
479    pub fn insert_token_in_memory(&mut self, token: Token) {
480        self.tokens.insert(token.address, token);
481    }
482
483    /// Marks a token address as invalid in the in-memory cache without persisting to the database.
484    pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
485        self.invalid_tokens.insert(address);
486    }
487
488    /// Adds an invalid token address with associated error information to the cache.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if adding the invalid token to the database fails.
493    pub async fn add_invalid_token(
494        &mut self,
495        address: Address,
496        error_string: &str,
497    ) -> anyhow::Result<()> {
498        if let Some(database) = &self.database {
499            database
500                .add_invalid_token(self.chain.chain_id, &address, error_string)
501                .await?;
502        }
503        self.invalid_tokens.insert(address);
504        Ok(())
505    }
506
507    /// Adds a [`PoolSwap`] to the cache database if available.
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if adding the swap to the database fails.
512    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
513        if let Some(database) = &self.database {
514            database.add_swap(self.chain.chain_id, swap).await?;
515        }
516
517        Ok(())
518    }
519
520    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if adding the liquidity update to the database fails.
525    pub async fn add_liquidity_update(
526        &self,
527        liquidity_update: &PoolLiquidityUpdate,
528    ) -> anyhow::Result<()> {
529        if let Some(database) = &self.database {
530            database
531                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
532                .await?;
533        }
534
535        Ok(())
536    }
537
538    /// Adds multiple [`PoolSwap`]s to the cache database in a single batch operation if available.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if adding the swaps to the database fails.
543    pub async fn add_pool_swaps_batch(
544        &self,
545        swaps: &[PoolSwap],
546        use_copy_command: bool,
547    ) -> anyhow::Result<()> {
548        if let Some(database) = &self.database {
549            if use_copy_command {
550                database
551                    .add_pool_swaps_copy(self.chain.chain_id, swaps)
552                    .await?;
553            } else {
554                database
555                    .add_pool_swaps_batch(self.chain.chain_id, swaps)
556                    .await?;
557            }
558        }
559
560        Ok(())
561    }
562
563    /// Adds multiple [`PoolLiquidityUpdate`]s to the cache database in a single batch operation if available.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if adding the liquidity updates to the database fails.
568    pub async fn add_pool_liquidity_updates_batch(
569        &self,
570        updates: &[PoolLiquidityUpdate],
571        use_copy_command: bool,
572    ) -> anyhow::Result<()> {
573        if let Some(database) = &self.database {
574            if use_copy_command {
575                database
576                    .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
577                    .await?;
578            } else {
579                database
580                    .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
581                    .await?;
582            }
583        }
584
585        Ok(())
586    }
587
588    /// Adds a batch of pool fee collect events to the cache.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if adding the fee collects to the database fails.
593    pub async fn add_pool_fee_collects_batch(
594        &self,
595        collects: &[PoolFeeCollect],
596        use_copy_command: bool,
597    ) -> anyhow::Result<()> {
598        if let Some(database) = &self.database {
599            if use_copy_command {
600                database
601                    .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
602                    .await?;
603            } else {
604                database
605                    .add_pool_collects_batch(self.chain.chain_id, collects)
606                    .await?;
607            }
608        }
609
610        Ok(())
611    }
612
613    /// Adds a batch of pool flash events to the cache.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if adding the flash events to the database fails.
618    pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
619        if let Some(database) = &self.database {
620            database
621                .add_pool_flash_batch(self.chain.chain_id, flash_events)
622                .await?;
623        }
624
625        Ok(())
626    }
627
628    /// Adds a pool snapshot to the cache database.
629    ///
630    /// This method saves the complete snapshot including:
631    /// - Pool state and analytics (pool_snapshot table)
632    /// - All positions at this snapshot (pool_position table)
633    /// - All ticks at this snapshot (pool_tick table)
634    ///
635    /// # Errors
636    ///
637    /// Returns an error if adding the snapshot to the database fails.
638    pub async fn add_pool_snapshot(
639        &self,
640        dex: &DexType,
641        pool_identifier: &PoolIdentifier,
642        snapshot: &PoolSnapshot,
643    ) -> anyhow::Result<()> {
644        if let Some(database) = &self.database {
645            // Save snapshot first (required for foreign key constraints)
646            database
647                .add_pool_snapshot(self.chain.chain_id, dex, pool_identifier, snapshot)
648                .await?;
649
650            let positions: Vec<(PoolIdentifier, PoolPosition)> = snapshot
651                .positions
652                .iter()
653                .map(|pos| (*pool_identifier, pos.clone()))
654                .collect();
655            if !positions.is_empty() {
656                database
657                    .add_pool_positions_batch(
658                        self.chain.chain_id,
659                        snapshot.block_position.number,
660                        snapshot.block_position.transaction_index,
661                        snapshot.block_position.log_index,
662                        &positions,
663                    )
664                    .await?;
665            }
666
667            let ticks: Vec<(PoolIdentifier, &PoolTick)> = snapshot
668                .ticks
669                .iter()
670                .map(|tick| (*pool_identifier, tick))
671                .collect();
672            if !ticks.is_empty() {
673                database
674                    .add_pool_ticks_batch(
675                        self.chain.chain_id,
676                        snapshot.block_position.number,
677                        snapshot.block_position.transaction_index,
678                        snapshot.block_position.log_index,
679                        &ticks,
680                    )
681                    .await?;
682            }
683        }
684
685        Ok(())
686    }
687
688    /// Updates the initial price and tick for a pool.
689    ///
690    /// # Errors
691    ///
692    /// Returns an error if the database update fails.
693    pub async fn update_pool_initialize_price_tick(
694        &mut self,
695        initialize_event: &InitializeEvent,
696    ) -> anyhow::Result<()> {
697        if let Some(database) = &self.database {
698            database
699                .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
700                .await?;
701        }
702
703        // Update the cached pool if it exists
704        let pool_identifier = initialize_event.pool_identifier;
705        if let Some(cached_pool) = self.pools.get(&pool_identifier) {
706            let mut updated_pool = (**cached_pool).clone();
707            updated_pool.initialize(initialize_event.sqrt_price_x96, initialize_event.tick);
708
709            self.pools.insert(pool_identifier, Arc::new(updated_pool));
710        }
711
712        Ok(())
713    }
714
715    /// Returns a reference to the `DexExtended` associated with the given name.
716    #[must_use]
717    pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
718        self.dexes.get(dex_id).cloned()
719    }
720
721    /// Returns a list of registered `DexType` in the cache.
722    #[must_use]
723    pub fn get_registered_dexes(&self) -> HashSet<DexType> {
724        self.dexes.keys().copied().collect()
725    }
726
727    /// Returns a reference to the pool associated with the given address.
728    #[must_use]
729    pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> Option<&SharedPool> {
730        self.pools.get(pool_identifier)
731    }
732
733    /// Returns a reference to the `Token` associated with the given address.
734    #[must_use]
735    pub fn get_token(&self, address: &Address) -> Option<&Token> {
736        self.tokens.get(address)
737    }
738
739    /// Checks if a token address is marked as invalid in the cache.
740    ///
741    /// Returns `true` if the address was previously recorded as invalid due to
742    /// validation or processing failures.
743    #[must_use]
744    pub fn is_invalid_token(&self, address: &Address) -> bool {
745        self.invalid_tokens.contains(address)
746    }
747
748    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if the database operation fails.
753    pub async fn update_dex_last_synced_block(
754        &self,
755        dex: &DexType,
756        block_number: u64,
757    ) -> anyhow::Result<()> {
758        if let Some(database) = &self.database {
759            database
760                .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
761                .await
762        } else {
763            Ok(())
764        }
765    }
766
767    /// Updates the last synced block number for a pool.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the database update fails.
772    pub async fn update_pool_last_synced_block(
773        &self,
774        dex: &DexType,
775        pool_identifier: &PoolIdentifier,
776        block_number: u64,
777    ) -> anyhow::Result<()> {
778        if let Some(database) = &self.database {
779            database
780                .update_pool_last_synced_block(
781                    self.chain.chain_id,
782                    dex,
783                    pool_identifier,
784                    block_number,
785                )
786                .await
787        } else {
788            Ok(())
789        }
790    }
791
792    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the database query fails.
797    pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
798        if let Some(database) = &self.database {
799            database
800                .get_dex_last_synced_block(self.chain.chain_id, dex)
801                .await
802        } else {
803            Ok(None)
804        }
805    }
806
807    /// Retrieves the last synced block number for a pool.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the database query fails.
812    pub async fn get_pool_last_synced_block(
813        &self,
814        dex: &DexType,
815        pool_identifier: &PoolIdentifier,
816    ) -> anyhow::Result<Option<u64>> {
817        if let Some(database) = &self.database {
818            database
819                .get_pool_last_synced_block(self.chain.chain_id, dex, pool_identifier)
820                .await
821        } else {
822            Ok(None)
823        }
824    }
825
826    /// Retrieves the maximum block number across all pool event tables for a given pool.
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if any of the database queries fail.
831    pub async fn get_pool_event_tables_last_block(
832        &self,
833        pool_identifier: &PoolIdentifier,
834    ) -> anyhow::Result<Option<u64>> {
835        if let Some(database) = &self.database {
836            let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
837                database.get_table_last_block(
838                    self.chain.chain_id,
839                    "pool_swap_event",
840                    pool_identifier
841                ),
842                database.get_table_last_block(
843                    self.chain.chain_id,
844                    "pool_liquidity_event",
845                    pool_identifier
846                ),
847                database.get_table_last_block(
848                    self.chain.chain_id,
849                    "pool_collect_event",
850                    pool_identifier
851                ),
852            )?;
853
854            let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
855                .into_iter()
856                .flatten()
857                .max();
858            Ok(max_block)
859        } else {
860            Ok(None)
861        }
862    }
863}