nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap, HashSet},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31    data::{PoolFeeCollect, PoolFlash},
32    pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
33    tick_map::tick::PoolTick,
34};
35use sqlx::postgres::PgConnectOptions;
36
37use crate::{
38    cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
39    events::initialize::InitializeEvent,
40};
41
42pub mod consistency;
43pub mod copy;
44pub mod database;
45pub mod rows;
46pub mod types;
47
48/// Provides caching functionality for various blockchain domain objects.
49#[derive(Debug)]
50pub struct BlockchainCache {
51    /// The blockchain chain this cache is associated with.
52    chain: SharedChain,
53    /// Map of block numbers to their corresponding timestamp
54    block_timestamps: BTreeMap<u64, UnixNanos>,
55    /// Map of DEX identifiers to their corresponding DEX objects.
56    dexes: HashMap<DexType, SharedDex>,
57    /// Map of token addresses to their corresponding `Token` objects.
58    tokens: HashMap<Address, Token>,
59    /// Cached set of invalid token addresses that failed validation or processing.
60    invalid_tokens: HashSet<Address>,
61    /// Map of pool addresses to their corresponding `Pool` objects.
62    pools: HashMap<Address, SharedPool>,
63    /// Optional database connection for persistent storage.
64    pub database: Option<BlockchainCacheDatabase>,
65}
66
67impl BlockchainCache {
68    /// Creates a new in-memory blockchain cache for the specified chain.
69    #[must_use]
70    pub fn new(chain: SharedChain) -> Self {
71        Self {
72            chain,
73            dexes: HashMap::new(),
74            tokens: HashMap::new(),
75            invalid_tokens: HashSet::new(),
76            pools: HashMap::new(),
77            block_timestamps: BTreeMap::new(),
78            database: None,
79        }
80    }
81
82    /// Returns the highest continuous block number currently cached, if any.
83    pub async fn get_cache_block_consistency_status(
84        &self,
85    ) -> Option<CachedBlocksConsistencyStatus> {
86        let database = self.database.as_ref()?;
87        database
88            .get_block_consistency_status(&self.chain)
89            .await
90            .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
91            .ok()
92    }
93
94    /// Returns the earliest block number where any DEX in the cache was created on the blockchain.
95    #[must_use]
96    pub fn min_dex_creation_block(&self) -> Option<u64> {
97        self.dexes
98            .values()
99            .map(|dex| dex.factory_creation_block)
100            .min()
101    }
102
103    /// Returns the timestamp for the specified block number if it exists in the cache.
104    #[must_use]
105    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
106        self.block_timestamps.get(&block_number)
107    }
108
109    /// Initializes the database connection for persistent storage.
110    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
111        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
112        self.database = Some(database);
113    }
114
115    /// Toggles performance optimization settings in the database.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the database is not initialized or the operation fails.
120    pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
121        if let Some(database) = &self.database {
122            database.toggle_perf_sync_settings(enable).await
123        } else {
124            tracing::warn!("Database not initialized, skipping performance settings toggle");
125            Ok(())
126        }
127    }
128
129    /// Initializes the chain by seeding it in the database and creating necessary partitions.
130    ///
131    /// This method sets up the blockchain chain in the database, creates block and token
132    /// partitions for optimal performance, and loads existing tokens into the cache.
133    pub async fn initialize_chain(&mut self) {
134        // Seed target adapter chain in database
135        if let Some(database) = &self.database {
136            if let Err(e) = database.seed_chain(&self.chain).await {
137                tracing::error!(
138                    "Error seeding chain in database: {e}. Continuing without database cache functionality"
139                );
140                return;
141            }
142            tracing::info!("Chain seeded in the database");
143
144            match database.create_block_partition(&self.chain).await {
145                Ok(message) => tracing::info!("Executing block partition creation: {}", message),
146                Err(e) => tracing::error!(
147                    "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
148                    self.chain.chain_id
149                ),
150            }
151
152            match database.create_token_partition(&self.chain).await {
153                Ok(message) => tracing::info!("Executing token partition creation: {}", message),
154                Err(e) => tracing::error!(
155                    "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
156                    self.chain.chain_id
157                ),
158            }
159        }
160
161        if let Err(e) = self.load_tokens().await {
162            tracing::error!("Error loading tokens from the database: {e}");
163        }
164    }
165
166    /// Connects to the database and loads initial data.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if database seeding, token loading, or block loading fails.
171    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
172        tracing::debug!("Connecting and loading from_block {from_block}");
173
174        if let Err(e) = self.load_tokens().await {
175            tracing::error!("Error loading tokens from the database: {e}");
176        }
177
178        // TODO disable block syncing for now as we don't have timestamps yet configured
179        // if let Err(e) = self.load_blocks(from_block).await {
180        //     log::error!("Error loading blocks from database: {e}");
181        // }
182
183        Ok(())
184    }
185
186    /// Loads tokens from the database into the in-memory cache.
187    async fn load_tokens(&mut self) -> anyhow::Result<()> {
188        if let Some(database) = &self.database {
189            let (tokens, invalid_tokens) = tokio::try_join!(
190                database.load_tokens(self.chain.clone()),
191                database.load_invalid_token_addresses(self.chain.chain_id)
192            )?;
193
194            tracing::info!(
195                "Loading {} valid tokens and {} invalid tokens from cache database",
196                tokens.len(),
197                invalid_tokens.len()
198            );
199
200            self.tokens
201                .extend(tokens.into_iter().map(|token| (token.address, token)));
202            self.invalid_tokens.extend(invalid_tokens);
203        }
204        Ok(())
205    }
206
207    /// Loads DEX exchange pools from the database into the in-memory cache.
208    ///
209    /// Returns the loaded pools.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the DEX has not been registered or if database operations fail.
214    pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
215        let mut loaded_pools = Vec::new();
216
217        if let Some(database) = &self.database {
218            let dex = self
219                .get_dex(dex_id)
220                .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
221            let pool_rows = database
222                .load_pools(self.chain.clone(), &dex_id.to_string())
223                .await?;
224            tracing::info!(
225                "Loading {} pools for DEX {} from cache database",
226                pool_rows.len(),
227                dex_id,
228            );
229
230            for pool_row in pool_rows {
231                let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
232                    token
233                } else {
234                    tracing::error!(
235                        "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
236                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
237                        pool_row.address,
238                        dex_id,
239                        pool_row.token0_address
240                    );
241                    continue;
242                };
243
244                let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
245                    token
246                } else {
247                    tracing::error!(
248                        "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
249                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
250                        pool_row.address,
251                        dex_id,
252                        pool_row.token1_address
253                    );
254                    continue;
255                };
256
257                // Construct pool from row data and cached tokens
258                let mut pool = Pool::new(
259                    self.chain.clone(),
260                    dex.clone(),
261                    pool_row.address,
262                    pool_row.creation_block as u64,
263                    token0.clone(),
264                    token1.clone(),
265                    pool_row.fee.map(|fee| fee as u32),
266                    pool_row
267                        .tick_spacing
268                        .map(|tick_spacing| tick_spacing as u32),
269                    UnixNanos::default(), // TODO use default for now
270                );
271
272                // Initialize pool with initial values if available
273                if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96 {
274                    if let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse() {
275                        pool.initialize(initial_sqrt_price_x96);
276                    }
277                }
278
279                // Add pool to cache and loaded pools list
280                loaded_pools.push(pool.clone());
281                self.pools.insert(pool.address, Arc::new(pool));
282            }
283        }
284        Ok(loaded_pools)
285    }
286
287    /// Loads block timestamps from the database starting `from_block` number
288    /// into the in-memory cache.
289    #[allow(dead_code, reason = "TODO: Under development")]
290    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
291        if let Some(database) = &self.database {
292            let block_timestamps = database
293                .load_block_timestamps(self.chain.clone(), from_block)
294                .await?;
295
296            // Verify block number sequence consistency
297            if !block_timestamps.is_empty() {
298                let first = block_timestamps.first().unwrap().number;
299                let last = block_timestamps.last().unwrap().number;
300                let expected_len = (last - first + 1) as usize;
301                if block_timestamps.len() != expected_len {
302                    anyhow::bail!(
303                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
304                        block_timestamps.len()
305                    );
306                }
307            }
308
309            if block_timestamps.is_empty() {
310                tracing::info!("No blocks found in database");
311                return Ok(());
312            }
313
314            tracing::info!(
315                "Loading {} blocks timestamps from the cache database with last block number {}",
316                block_timestamps.len(),
317                block_timestamps.last().unwrap().number,
318            );
319            for block in block_timestamps {
320                self.block_timestamps.insert(block.number, block.timestamp);
321            }
322        }
323        Ok(())
324    }
325
326    /// Adds a block to the cache and persists it to the database if available.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if adding the block to the database fails.
331    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
332        if let Some(database) = &self.database {
333            database.add_block(self.chain.chain_id, &block).await?;
334        }
335        self.block_timestamps.insert(block.number, block.timestamp);
336        Ok(())
337    }
338
339    /// Adds multiple blocks to the cache and persists them to the database in batch if available.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if adding the blocks to the database fails.
344    pub async fn add_blocks_batch(
345        &mut self,
346        blocks: Vec<Block>,
347        use_copy_command: bool,
348    ) -> anyhow::Result<()> {
349        if blocks.is_empty() {
350            return Ok(());
351        }
352
353        if let Some(database) = &self.database {
354            if use_copy_command {
355                database
356                    .add_blocks_copy(self.chain.chain_id, &blocks)
357                    .await?;
358            } else {
359                database
360                    .add_blocks_batch(self.chain.chain_id, &blocks)
361                    .await?;
362            }
363        }
364
365        // Update in-memory cache
366        for block in blocks {
367            self.block_timestamps.insert(block.number, block.timestamp);
368        }
369
370        Ok(())
371    }
372
373    /// Adds a DEX to the cache with the specified identifier.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if adding the DEX to the database fails.
378    pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
379        tracing::info!("Adding dex {} to the cache", dex.name);
380
381        if let Some(database) = &self.database {
382            database.add_dex(dex.clone()).await?;
383        }
384
385        self.dexes.insert(dex.name, dex);
386        Ok(())
387    }
388
389    /// Adds a liquidity pool/pair to the cache.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if adding the pool to the database fails.
394    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
395        let pool_address = pool.address;
396        if let Some(database) = &self.database {
397            database.add_pool(&pool).await?;
398        }
399
400        self.pools.insert(pool_address, Arc::new(pool));
401        Ok(())
402    }
403
404    /// Adds multiple pools to the cache and persists them to the database in batch if available.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if adding the pools to the database fails.
409    pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
410        if pools.is_empty() {
411            return Ok(());
412        }
413
414        if let Some(database) = &self.database {
415            database.add_pools_copy(self.chain.chain_id, &pools).await?;
416        }
417        self.pools
418            .extend(pools.into_iter().map(|pool| (pool.address, Arc::new(pool))));
419
420        Ok(())
421    }
422
423    /// Adds a [`Token`] to the cache.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if adding the token to the database fails.
428    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
429        if let Some(database) = &self.database {
430            database.add_token(&token).await?;
431        }
432        self.tokens.insert(token.address, token);
433        Ok(())
434    }
435
436    /// Adds multiple tokens to the cache and persists them to the database in batch if available.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if adding the tokens to the database fails.
441    pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
442        if tokens.is_empty() {
443            return Ok(());
444        }
445
446        if let Some(database) = &self.database {
447            database
448                .add_tokens_copy(self.chain.chain_id, &tokens)
449                .await?;
450        }
451
452        self.tokens
453            .extend(tokens.into_iter().map(|token| (token.address, token)));
454
455        Ok(())
456    }
457
458    /// Updates the in-memory token cache without persisting to the database.
459    pub fn insert_token_in_memory(&mut self, token: Token) {
460        self.tokens.insert(token.address, token);
461    }
462
463    /// Marks a token address as invalid in the in-memory cache without persisting to the database.
464    pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
465        self.invalid_tokens.insert(address);
466    }
467
468    /// Adds an invalid token address with associated error information to the cache.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if adding the invalid token to the database fails.
473    pub async fn add_invalid_token(
474        &mut self,
475        address: Address,
476        error_string: &str,
477    ) -> anyhow::Result<()> {
478        if let Some(database) = &self.database {
479            database
480                .add_invalid_token(self.chain.chain_id, &address, error_string)
481                .await?;
482        }
483        self.invalid_tokens.insert(address);
484        Ok(())
485    }
486
487    /// Adds a [`PoolSwap`] to the cache database if available.
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if adding the swap to the database fails.
492    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
493        if let Some(database) = &self.database {
494            database.add_swap(self.chain.chain_id, swap).await?;
495        }
496
497        Ok(())
498    }
499
500    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if adding the liquidity update to the database fails.
505    pub async fn add_liquidity_update(
506        &self,
507        liquidity_update: &PoolLiquidityUpdate,
508    ) -> anyhow::Result<()> {
509        if let Some(database) = &self.database {
510            database
511                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
512                .await?;
513        }
514
515        Ok(())
516    }
517
518    /// Adds multiple [`PoolSwap`]s to the cache database in a single batch operation if available.
519    ///
520    /// # Errors
521    ///
522    /// Returns an error if adding the swaps to the database fails.
523    pub async fn add_pool_swaps_batch(
524        &self,
525        swaps: &[PoolSwap],
526        use_copy_command: bool,
527    ) -> anyhow::Result<()> {
528        if let Some(database) = &self.database {
529            if use_copy_command {
530                database
531                    .add_pool_swaps_copy(self.chain.chain_id, swaps)
532                    .await?;
533            } else {
534                database
535                    .add_pool_swaps_batch(self.chain.chain_id, swaps)
536                    .await?;
537            }
538        }
539
540        Ok(())
541    }
542
543    /// Adds multiple [`PoolLiquidityUpdate`]s to the cache database in a single batch operation if available.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if adding the liquidity updates to the database fails.
548    pub async fn add_pool_liquidity_updates_batch(
549        &self,
550        updates: &[PoolLiquidityUpdate],
551        use_copy_command: bool,
552    ) -> anyhow::Result<()> {
553        if let Some(database) = &self.database {
554            if use_copy_command {
555                database
556                    .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
557                    .await?;
558            } else {
559                database
560                    .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
561                    .await?;
562            }
563        }
564
565        Ok(())
566    }
567
568    /// Adds a batch of pool fee collect events to the cache.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if adding the fee collects to the database fails.
573    pub async fn add_pool_fee_collects_batch(
574        &self,
575        collects: &[PoolFeeCollect],
576        use_copy_command: bool,
577    ) -> anyhow::Result<()> {
578        if let Some(database) = &self.database {
579            if use_copy_command {
580                database
581                    .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
582                    .await?;
583            } else {
584                database
585                    .add_pool_collects_batch(self.chain.chain_id, collects)
586                    .await?;
587            }
588        }
589
590        Ok(())
591    }
592
593    /// Adds a batch of pool flash events to the cache.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if adding the flash events to the database fails.
598    pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
599        if let Some(database) = &self.database {
600            database
601                .add_pool_flash_batch(self.chain.chain_id, flash_events)
602                .await?;
603        }
604
605        Ok(())
606    }
607
608    /// Adds a pool snapshot to the cache database.
609    ///
610    /// This method saves the complete snapshot including:
611    /// - Pool state and analytics (pool_snapshot table)
612    /// - All positions at this snapshot (pool_position table)
613    /// - All ticks at this snapshot (pool_tick table)
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if adding the snapshot to the database fails.
618    pub async fn add_pool_snapshot(
619        &self,
620        pool_address: &Address,
621        snapshot: &PoolSnapshot,
622    ) -> anyhow::Result<()> {
623        if let Some(database) = &self.database {
624            // Save snapshot first (required for foreign key constraints)
625            database
626                .add_pool_snapshot(self.chain.chain_id, pool_address, snapshot)
627                .await?;
628
629            let positions: Vec<(Address, PoolPosition)> = snapshot
630                .positions
631                .iter()
632                .map(|pos| (*pool_address, pos.clone()))
633                .collect();
634            if !positions.is_empty() {
635                database
636                    .add_pool_positions_batch(
637                        self.chain.chain_id,
638                        snapshot.block_position.number,
639                        snapshot.block_position.transaction_index,
640                        snapshot.block_position.log_index,
641                        &positions,
642                    )
643                    .await?;
644            }
645
646            let ticks: Vec<(Address, &PoolTick)> = snapshot
647                .ticks
648                .iter()
649                .map(|tick| (*pool_address, tick))
650                .collect();
651            if !ticks.is_empty() {
652                database
653                    .add_pool_ticks_batch(
654                        self.chain.chain_id,
655                        snapshot.block_position.number,
656                        snapshot.block_position.transaction_index,
657                        snapshot.block_position.log_index,
658                        &ticks,
659                    )
660                    .await?;
661            }
662        }
663
664        Ok(())
665    }
666
667    pub async fn update_pool_initialize_price_tick(
668        &mut self,
669        initialize_event: &InitializeEvent,
670    ) -> anyhow::Result<()> {
671        if let Some(database) = &self.database {
672            database
673                .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
674                .await?;
675        }
676
677        // Update the cached pool if it exists
678        if let Some(cached_pool) = self.pools.get(&initialize_event.pool_address) {
679            let mut updated_pool = (**cached_pool).clone();
680            updated_pool.initialize(initialize_event.sqrt_price_x96);
681
682            self.pools
683                .insert(initialize_event.pool_address, Arc::new(updated_pool));
684        }
685
686        Ok(())
687    }
688
689    /// Returns a reference to the `DexExtended` associated with the given name.
690    #[must_use]
691    pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
692        self.dexes.get(dex_id).cloned()
693    }
694
695    /// Returns a list of registered `DexType` in the cache.
696    #[must_use]
697    pub fn get_registered_dexes(&self) -> HashSet<DexType> {
698        self.dexes.keys().copied().collect()
699    }
700
701    /// Returns a reference to the pool associated with the given address.
702    #[must_use]
703    pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
704        self.pools.get(address)
705    }
706
707    /// Returns a reference to the `Token` associated with the given address.
708    #[must_use]
709    pub fn get_token(&self, address: &Address) -> Option<&Token> {
710        self.tokens.get(address)
711    }
712
713    /// Checks if a token address is marked as invalid in the cache.
714    ///
715    /// Returns `true` if the address was previously recorded as invalid due to
716    /// validation or processing failures.
717    #[must_use]
718    pub fn is_invalid_token(&self, address: &Address) -> bool {
719        self.invalid_tokens.contains(address)
720    }
721
722    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
723    ///
724    /// # Errors
725    ///
726    /// Returns an error if the database operation fails.
727    pub async fn update_dex_last_synced_block(
728        &self,
729        dex: &DexType,
730        block_number: u64,
731    ) -> anyhow::Result<()> {
732        if let Some(database) = &self.database {
733            database
734                .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
735                .await
736        } else {
737            Ok(())
738        }
739    }
740
741    pub async fn update_pool_last_synced_block(
742        &self,
743        dex: &DexType,
744        pool_address: &Address,
745        block_number: u64,
746    ) -> anyhow::Result<()> {
747        if let Some(database) = &self.database {
748            database
749                .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
750                .await
751        } else {
752            Ok(())
753        }
754    }
755
756    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the database query fails.
761    pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
762        if let Some(database) = &self.database {
763            database
764                .get_dex_last_synced_block(self.chain.chain_id, dex)
765                .await
766        } else {
767            Ok(None)
768        }
769    }
770
771    pub async fn get_pool_last_synced_block(
772        &self,
773        dex: &DexType,
774        pool_address: &Address,
775    ) -> anyhow::Result<Option<u64>> {
776        if let Some(database) = &self.database {
777            database
778                .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
779                .await
780        } else {
781            Ok(None)
782        }
783    }
784
785    /// Retrieves the maximum block number across all pool event tables for a given pool.
786    ///
787    /// # Errors
788    ///
789    /// Returns an error if any of the database queries fail.
790    pub async fn get_pool_event_tables_last_block(
791        &self,
792        pool_address: &Address,
793    ) -> anyhow::Result<Option<u64>> {
794        if let Some(database) = &self.database {
795            let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
796                database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
797                database.get_table_last_block(
798                    self.chain.chain_id,
799                    "pool_liquidity_event",
800                    pool_address
801                ),
802                database.get_table_last_block(
803                    self.chain.chain_id,
804                    "pool_collect_event",
805                    pool_address
806                ),
807            )?;
808
809            let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
810                .into_iter()
811                .filter_map(|x| x)
812                .max();
813            Ok(max_block)
814        } else {
815            Ok(None)
816        }
817    }
818}