nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap, HashSet},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31    data::{PoolFeeCollect, PoolFlash},
32    pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
33    tick_map::tick::PoolTick,
34};
35use sqlx::postgres::PgConnectOptions;
36
37use crate::{
38    cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
39    events::initialize::InitializeEvent,
40};
41
42pub mod consistency;
43pub mod copy;
44pub mod database;
45pub mod rows;
46pub mod types;
47
48/// Provides caching functionality for various blockchain domain objects.
49#[derive(Debug)]
50pub struct BlockchainCache {
51    /// The blockchain chain this cache is associated with.
52    chain: SharedChain,
53    /// Map of block numbers to their corresponding timestamp
54    block_timestamps: BTreeMap<u64, UnixNanos>,
55    /// Map of DEX identifiers to their corresponding DEX objects.
56    dexes: HashMap<DexType, SharedDex>,
57    /// Map of token addresses to their corresponding `Token` objects.
58    tokens: HashMap<Address, Token>,
59    /// Cached set of invalid token addresses that failed validation or processing.
60    invalid_tokens: HashSet<Address>,
61    /// Map of pool addresses to their corresponding `Pool` objects.
62    pools: HashMap<Address, SharedPool>,
63    /// Optional database connection for persistent storage.
64    pub database: Option<BlockchainCacheDatabase>,
65}
66
67impl BlockchainCache {
68    /// Creates a new in-memory blockchain cache for the specified chain.
69    #[must_use]
70    pub fn new(chain: SharedChain) -> Self {
71        Self {
72            chain,
73            dexes: HashMap::new(),
74            tokens: HashMap::new(),
75            invalid_tokens: HashSet::new(),
76            pools: HashMap::new(),
77            block_timestamps: BTreeMap::new(),
78            database: None,
79        }
80    }
81
82    /// Returns the highest continuous block number currently cached, if any.
83    pub async fn get_cache_block_consistency_status(
84        &self,
85    ) -> Option<CachedBlocksConsistencyStatus> {
86        let database = self.database.as_ref()?;
87        database
88            .get_block_consistency_status(&self.chain)
89            .await
90            .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
91            .ok()
92    }
93
94    /// Returns the earliest block number where any DEX in the cache was created on the blockchain.
95    #[must_use]
96    pub fn min_dex_creation_block(&self) -> Option<u64> {
97        self.dexes
98            .values()
99            .map(|dex| dex.factory_creation_block)
100            .min()
101    }
102
103    /// Returns the timestamp for the specified block number if it exists in the cache.
104    #[must_use]
105    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
106        self.block_timestamps.get(&block_number)
107    }
108
109    /// Initializes the database connection for persistent storage.
110    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
111        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
112        self.database = Some(database);
113    }
114
115    /// Toggles performance optimization settings in the database.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the database is not initialized or the operation fails.
120    pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
121        if let Some(database) = &self.database {
122            database.toggle_perf_sync_settings(enable).await
123        } else {
124            tracing::warn!("Database not initialized, skipping performance settings toggle");
125            Ok(())
126        }
127    }
128
129    /// Initializes the chain by seeding it in the database and creating necessary partitions.
130    ///
131    /// This method sets up the blockchain chain in the database, creates block and token
132    /// partitions for optimal performance, and loads existing tokens into the cache.
133    pub async fn initialize_chain(&mut self) {
134        // Seed target adapter chain in database
135        if let Some(database) = &self.database {
136            if let Err(e) = database.seed_chain(&self.chain).await {
137                tracing::error!(
138                    "Error seeding chain in database: {e}. Continuing without database cache functionality"
139                );
140                return;
141            }
142            tracing::info!("Chain seeded in the database");
143
144            match database.create_block_partition(&self.chain).await {
145                Ok(message) => tracing::info!("Executing block partition creation: {}", message),
146                Err(e) => tracing::error!(
147                    "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
148                    self.chain.chain_id
149                ),
150            }
151
152            match database.create_token_partition(&self.chain).await {
153                Ok(message) => tracing::info!("Executing token partition creation: {}", message),
154                Err(e) => tracing::error!(
155                    "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
156                    self.chain.chain_id
157                ),
158            }
159        }
160
161        if let Err(e) = self.load_tokens().await {
162            tracing::error!("Error loading tokens from the database: {e}");
163        }
164    }
165
166    /// Connects to the database and loads initial data.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if database seeding, token loading, or block loading fails.
171    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
172        tracing::debug!("Connecting and loading from_block {from_block}");
173
174        if let Err(e) = self.load_tokens().await {
175            tracing::error!("Error loading tokens from the database: {e}");
176        }
177
178        // TODO disable block syncing for now as we don't have timestamps yet configured
179        // if let Err(e) = self.load_blocks(from_block).await {
180        //     log::error!("Error loading blocks from database: {e}");
181        // }
182
183        Ok(())
184    }
185
186    /// Loads tokens from the database into the in-memory cache.
187    async fn load_tokens(&mut self) -> anyhow::Result<()> {
188        if let Some(database) = &self.database {
189            let (tokens, invalid_tokens) = tokio::try_join!(
190                database.load_tokens(self.chain.clone()),
191                database.load_invalid_token_addresses(self.chain.chain_id)
192            )?;
193
194            tracing::info!(
195                "Loading {} valid tokens and {} invalid tokens from cache database",
196                tokens.len(),
197                invalid_tokens.len()
198            );
199
200            self.tokens
201                .extend(tokens.into_iter().map(|token| (token.address, token)));
202            self.invalid_tokens.extend(invalid_tokens);
203        }
204        Ok(())
205    }
206
207    /// Loads DEX exchange pools from the database into the in-memory cache.
208    ///
209    /// Returns the loaded pools.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the DEX has not been registered or if database operations fail.
214    pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
215        let mut loaded_pools = Vec::new();
216
217        if let Some(database) = &self.database {
218            let dex = self
219                .get_dex(dex_id)
220                .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
221            let pool_rows = database
222                .load_pools(self.chain.clone(), &dex_id.to_string())
223                .await?;
224            tracing::info!(
225                "Loading {} pools for DEX {} from cache database",
226                pool_rows.len(),
227                dex_id,
228            );
229
230            for pool_row in pool_rows {
231                let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
232                    token
233                } else {
234                    tracing::error!(
235                        "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
236                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
237                        pool_row.address,
238                        dex_id,
239                        pool_row.token0_address
240                    );
241                    continue;
242                };
243
244                let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
245                    token
246                } else {
247                    tracing::error!(
248                        "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
249                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
250                        pool_row.address,
251                        dex_id,
252                        pool_row.token1_address
253                    );
254                    continue;
255                };
256
257                // Construct pool from row data and cached tokens
258                let mut pool = Pool::new(
259                    self.chain.clone(),
260                    dex.clone(),
261                    pool_row.address,
262                    pool_row.creation_block as u64,
263                    token0.clone(),
264                    token1.clone(),
265                    pool_row.fee.map(|fee| fee as u32),
266                    pool_row
267                        .tick_spacing
268                        .map(|tick_spacing| tick_spacing as u32),
269                    UnixNanos::default(), // TODO use default for now
270                );
271
272                // Initialize pool with initial values if available
273                if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96 {
274                    if let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse() {
275                        pool.initialize(initial_sqrt_price_x96);
276                    }
277                }
278
279                // Add pool to cache and loaded pools list
280                loaded_pools.push(pool.clone());
281                self.pools.insert(pool.address, Arc::new(pool));
282            }
283        }
284        Ok(loaded_pools)
285    }
286
287    /// Loads block timestamps from the database starting `from_block` number
288    /// into the in-memory cache.
289    #[allow(dead_code, reason = "TODO: Under development")]
290    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
291        if let Some(database) = &self.database {
292            let block_timestamps = database
293                .load_block_timestamps(self.chain.clone(), from_block)
294                .await?;
295
296            // Verify block number sequence consistency
297            if !block_timestamps.is_empty() {
298                let first = block_timestamps.first().unwrap().number;
299                let last = block_timestamps.last().unwrap().number;
300                let expected_len = (last - first + 1) as usize;
301                if block_timestamps.len() != expected_len {
302                    anyhow::bail!(
303                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
304                        block_timestamps.len()
305                    );
306                }
307            }
308
309            if block_timestamps.is_empty() {
310                tracing::info!("No blocks found in database");
311                return Ok(());
312            }
313
314            tracing::info!(
315                "Loading {} blocks timestamps from the cache database with last block number {}",
316                block_timestamps.len(),
317                block_timestamps.last().unwrap().number,
318            );
319            for block in block_timestamps {
320                self.block_timestamps.insert(block.number, block.timestamp);
321            }
322        }
323        Ok(())
324    }
325
326    /// Adds a block to the cache and persists it to the database if available.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if adding the block to the database fails.
331    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
332        if let Some(database) = &self.database {
333            database.add_block(self.chain.chain_id, &block).await?;
334        }
335        self.block_timestamps.insert(block.number, block.timestamp);
336        Ok(())
337    }
338
339    /// Adds multiple blocks to the cache and persists them to the database in batch if available.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if adding the blocks to the database fails.
344    pub async fn add_blocks_batch(
345        &mut self,
346        blocks: Vec<Block>,
347        use_copy_command: bool,
348    ) -> anyhow::Result<()> {
349        if blocks.is_empty() {
350            return Ok(());
351        }
352
353        if let Some(database) = &self.database {
354            if use_copy_command {
355                database
356                    .add_blocks_copy(self.chain.chain_id, &blocks)
357                    .await?;
358            } else {
359                database
360                    .add_blocks_batch(self.chain.chain_id, &blocks)
361                    .await?;
362            }
363        }
364
365        // Update in-memory cache
366        for block in blocks {
367            self.block_timestamps.insert(block.number, block.timestamp);
368        }
369
370        Ok(())
371    }
372
373    /// Adds a DEX to the cache with the specified identifier.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if adding the DEX to the database fails.
378    pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
379        tracing::info!("Adding dex {} to the cache", dex.name);
380
381        if let Some(database) = &self.database {
382            database.add_dex(dex.clone()).await?;
383        }
384
385        self.dexes.insert(dex.name, dex);
386        Ok(())
387    }
388
389    /// Adds a liquidity pool/pair to the cache.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if adding the pool to the database fails.
394    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
395        let pool_address = pool.address;
396        if let Some(database) = &self.database {
397            database.add_pool(&pool).await?;
398        }
399
400        self.pools.insert(pool_address, Arc::new(pool));
401        Ok(())
402    }
403
404    /// Adds multiple pools to the cache and persists them to the database in batch if available.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if adding the pools to the database fails.
409    pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
410        if pools.is_empty() {
411            return Ok(());
412        }
413
414        if let Some(database) = &self.database {
415            database.add_pools_batch(&pools).await?;
416        }
417
418        Ok(())
419    }
420
421    /// Adds a [`Token`] to the cache.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if adding the token to the database fails.
426    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
427        if let Some(database) = &self.database {
428            database.add_token(&token).await?;
429        }
430        self.tokens.insert(token.address, token);
431        Ok(())
432    }
433
434    /// Adds an invalid token address with associated error information to the cache.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if adding the invalid token to the database fails.
439    pub async fn add_invalid_token(
440        &mut self,
441        address: Address,
442        error_string: &str,
443    ) -> anyhow::Result<()> {
444        if let Some(database) = &self.database {
445            database
446                .add_invalid_token(self.chain.chain_id, &address, error_string)
447                .await?;
448        }
449        self.invalid_tokens.insert(address);
450        Ok(())
451    }
452
453    /// Adds a [`PoolSwap`] to the cache database if available.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if adding the swap to the database fails.
458    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
459        if let Some(database) = &self.database {
460            database.add_swap(self.chain.chain_id, swap).await?;
461        }
462
463        Ok(())
464    }
465
466    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if adding the liquidity update to the database fails.
471    pub async fn add_liquidity_update(
472        &self,
473        liquidity_update: &PoolLiquidityUpdate,
474    ) -> anyhow::Result<()> {
475        if let Some(database) = &self.database {
476            database
477                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
478                .await?;
479        }
480
481        Ok(())
482    }
483
484    /// Adds multiple [`PoolSwap`]s to the cache database in a single batch operation if available.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if adding the swaps to the database fails.
489    pub async fn add_pool_swaps_batch(
490        &self,
491        swaps: &[PoolSwap],
492        use_copy_command: bool,
493    ) -> anyhow::Result<()> {
494        if let Some(database) = &self.database {
495            if use_copy_command {
496                database
497                    .add_pool_swaps_copy(self.chain.chain_id, swaps)
498                    .await?;
499            } else {
500                database
501                    .add_pool_swaps_batch(self.chain.chain_id, swaps)
502                    .await?;
503            }
504        }
505
506        Ok(())
507    }
508
509    /// Adds multiple [`PoolLiquidityUpdate`]s to the cache database in a single batch operation if available.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if adding the liquidity updates to the database fails.
514    pub async fn add_pool_liquidity_updates_batch(
515        &self,
516        updates: &[PoolLiquidityUpdate],
517        use_copy_command: bool,
518    ) -> anyhow::Result<()> {
519        if let Some(database) = &self.database {
520            if use_copy_command {
521                database
522                    .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
523                    .await?;
524            } else {
525                database
526                    .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
527                    .await?;
528            }
529        }
530
531        Ok(())
532    }
533
534    /// Adds a batch of pool fee collect events to the cache.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if adding the fee collects to the database fails.
539    pub async fn add_pool_fee_collects_batch(
540        &self,
541        collects: &[PoolFeeCollect],
542        use_copy_command: bool,
543    ) -> anyhow::Result<()> {
544        if let Some(database) = &self.database {
545            if use_copy_command {
546                database
547                    .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
548                    .await?;
549            } else {
550                database
551                    .add_pool_collects_batch(self.chain.chain_id, collects)
552                    .await?;
553            }
554        }
555
556        Ok(())
557    }
558
559    /// Adds a batch of pool flash events to the cache.
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if adding the flash events to the database fails.
564    pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
565        if let Some(database) = &self.database {
566            database
567                .add_pool_flash_batch(self.chain.chain_id, flash_events)
568                .await?;
569        }
570
571        Ok(())
572    }
573
574    /// Adds a pool snapshot to the cache database.
575    ///
576    /// This method saves the complete snapshot including:
577    /// - Pool state and analytics (pool_snapshot table)
578    /// - All positions at this snapshot (pool_position table)
579    /// - All ticks at this snapshot (pool_tick table)
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if adding the snapshot to the database fails.
584    pub async fn add_pool_snapshot(
585        &self,
586        pool_address: &Address,
587        snapshot: &PoolSnapshot,
588    ) -> anyhow::Result<()> {
589        if let Some(database) = &self.database {
590            // Save snapshot first (required for foreign key constraints)
591            database
592                .add_pool_snapshot(self.chain.chain_id, pool_address, snapshot)
593                .await?;
594
595            let positions: Vec<(Address, PoolPosition)> = snapshot
596                .positions
597                .iter()
598                .map(|pos| (*pool_address, pos.clone()))
599                .collect();
600            if !positions.is_empty() {
601                database
602                    .add_pool_positions_batch(
603                        self.chain.chain_id,
604                        snapshot.block_position.number,
605                        snapshot.block_position.transaction_index,
606                        snapshot.block_position.log_index,
607                        &positions,
608                    )
609                    .await?;
610            }
611
612            let ticks: Vec<(Address, &PoolTick)> = snapshot
613                .ticks
614                .iter()
615                .map(|tick| (*pool_address, tick))
616                .collect();
617            if !ticks.is_empty() {
618                database
619                    .add_pool_ticks_batch(
620                        self.chain.chain_id,
621                        snapshot.block_position.number,
622                        snapshot.block_position.transaction_index,
623                        snapshot.block_position.log_index,
624                        &ticks,
625                    )
626                    .await?;
627            }
628        }
629
630        Ok(())
631    }
632
633    pub async fn update_pool_initialize_price_tick(
634        &mut self,
635        initialize_event: &InitializeEvent,
636    ) -> anyhow::Result<()> {
637        if let Some(database) = &self.database {
638            database
639                .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
640                .await?;
641        }
642
643        // Update the cached pool if it exists
644        if let Some(cached_pool) = self.pools.get(&initialize_event.pool_address) {
645            let mut updated_pool = (**cached_pool).clone();
646            updated_pool.initialize(initialize_event.sqrt_price_x96);
647
648            self.pools
649                .insert(initialize_event.pool_address, Arc::new(updated_pool));
650        }
651
652        Ok(())
653    }
654
655    /// Returns a reference to the `DexExtended` associated with the given name.
656    #[must_use]
657    pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
658        self.dexes.get(dex_id).cloned()
659    }
660
661    /// Returns a list of registered `DexType` in the cache.
662    #[must_use]
663    pub fn get_registered_dexes(&self) -> HashSet<DexType> {
664        self.dexes.keys().copied().collect()
665    }
666
667    /// Returns a reference to the pool associated with the given address.
668    #[must_use]
669    pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
670        self.pools.get(address)
671    }
672
673    /// Returns a reference to the `Token` associated with the given address.
674    #[must_use]
675    pub fn get_token(&self, address: &Address) -> Option<&Token> {
676        self.tokens.get(address)
677    }
678
679    /// Checks if a token address is marked as invalid in the cache.
680    ///
681    /// Returns `true` if the address was previously recorded as invalid due to
682    /// validation or processing failures.
683    #[must_use]
684    pub fn is_invalid_token(&self, address: &Address) -> bool {
685        self.invalid_tokens.contains(address)
686    }
687
688    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
689    ///
690    /// # Errors
691    ///
692    /// Returns an error if the database operation fails.
693    pub async fn update_dex_last_synced_block(
694        &self,
695        dex: &DexType,
696        block_number: u64,
697    ) -> anyhow::Result<()> {
698        if let Some(database) = &self.database {
699            database
700                .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
701                .await
702        } else {
703            Ok(())
704        }
705    }
706
707    pub async fn update_pool_last_synced_block(
708        &self,
709        dex: &DexType,
710        pool_address: &Address,
711        block_number: u64,
712    ) -> anyhow::Result<()> {
713        if let Some(database) = &self.database {
714            database
715                .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
716                .await
717        } else {
718            Ok(())
719        }
720    }
721
722    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
723    ///
724    /// # Errors
725    ///
726    /// Returns an error if the database query fails.
727    pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
728        if let Some(database) = &self.database {
729            database
730                .get_dex_last_synced_block(self.chain.chain_id, dex)
731                .await
732        } else {
733            Ok(None)
734        }
735    }
736
737    pub async fn get_pool_last_synced_block(
738        &self,
739        dex: &DexType,
740        pool_address: &Address,
741    ) -> anyhow::Result<Option<u64>> {
742        if let Some(database) = &self.database {
743            database
744                .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
745                .await
746        } else {
747            Ok(None)
748        }
749    }
750
751    /// Retrieves the maximum block number across all pool event tables for a given pool.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if any of the database queries fail.
756    pub async fn get_pool_event_tables_last_block(
757        &self,
758        pool_address: &Address,
759    ) -> anyhow::Result<Option<u64>> {
760        if let Some(database) = &self.database {
761            let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
762                database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
763                database.get_table_last_block(
764                    self.chain.chain_id,
765                    "pool_liquidity_event",
766                    pool_address
767                ),
768                database.get_table_last_block(
769                    self.chain.chain_id,
770                    "pool_collect_event",
771                    pool_address
772                ),
773            )?;
774
775            let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
776                .into_iter()
777                .filter_map(|x| x)
778                .max();
779            Ok(max_block)
780        } else {
781            Ok(None)
782        }
783    }
784}