nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap, HashSet},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31    data::PoolFeeCollect,
32};
33use sqlx::postgres::PgConnectOptions;
34
35use crate::{
36    cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
37    events::initialize::InitializeEvent,
38};
39
40pub mod consistency;
41pub mod copy;
42pub mod database;
43pub mod rows;
44
45/// Provides caching functionality for various blockchain domain objects.
46#[derive(Debug)]
47pub struct BlockchainCache {
48    /// The blockchain chain this cache is associated with.
49    chain: SharedChain,
50    /// Map of block numbers to their corresponding timestamp
51    block_timestamps: BTreeMap<u64, UnixNanos>,
52    /// Map of DEX identifiers to their corresponding DEX objects.
53    dexes: HashMap<DexType, SharedDex>,
54    /// Map of token addresses to their corresponding `Token` objects.
55    tokens: HashMap<Address, Token>,
56    /// Cached set of invalid token addresses that failed validation or processing.
57    invalid_tokens: HashSet<Address>,
58    /// Map of pool addresses to their corresponding `Pool` objects.
59    pools: HashMap<Address, SharedPool>,
60    /// Optional database connection for persistent storage.
61    database: Option<BlockchainCacheDatabase>,
62}
63
64impl BlockchainCache {
65    /// Creates a new in-memory blockchain cache for the specified chain.
66    #[must_use]
67    pub fn new(chain: SharedChain) -> Self {
68        Self {
69            chain,
70            dexes: HashMap::new(),
71            tokens: HashMap::new(),
72            invalid_tokens: HashSet::new(),
73            pools: HashMap::new(),
74            block_timestamps: BTreeMap::new(),
75            database: None,
76        }
77    }
78
79    /// Returns the highest continuous block number currently cached, if any.
80    pub async fn get_cache_block_consistency_status(
81        &self,
82    ) -> Option<CachedBlocksConsistencyStatus> {
83        let database = self.database.as_ref()?;
84        database
85            .get_block_consistency_status(&self.chain)
86            .await
87            .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
88            .ok()
89    }
90
91    /// Returns the earliest block number where any DEX in the cache was created on the blockchain.
92    #[must_use]
93    pub fn min_dex_creation_block(&self) -> Option<u64> {
94        self.dexes
95            .values()
96            .map(|dex| dex.factory_creation_block)
97            .min()
98    }
99
100    /// Returns the timestamp for the specified block number if it exists in the cache.
101    #[must_use]
102    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
103        self.block_timestamps.get(&block_number)
104    }
105
106    /// Initializes the database connection for persistent storage.
107    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
108        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
109        self.database = Some(database);
110    }
111
112    /// Toggles performance optimization settings in the database.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the database is not initialized or the operation fails.
117    pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
118        if let Some(database) = &self.database {
119            database.toggle_perf_sync_settings(enable).await
120        } else {
121            tracing::warn!("Database not initialized, skipping performance settings toggle");
122            Ok(())
123        }
124    }
125
126    /// Initializes the chain by seeding it in the database and creating necessary partitions.
127    ///
128    /// This method sets up the blockchain chain in the database, creates block and token
129    /// partitions for optimal performance, and loads existing tokens into the cache.
130    pub async fn initialize_chain(&mut self) {
131        // Seed target adapter chain in database
132        if let Some(database) = &self.database {
133            if let Err(e) = database.seed_chain(&self.chain).await {
134                tracing::error!(
135                    "Error seeding chain in database: {e}. Continuing without database cache functionality"
136                );
137                return;
138            }
139            tracing::info!("Chain seeded in the database");
140
141            match database.create_block_partition(&self.chain).await {
142                Ok(message) => tracing::info!("Executing block partition creation: {}", message),
143                Err(e) => tracing::error!(
144                    "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
145                    self.chain.chain_id
146                ),
147            }
148
149            match database.create_token_partition(&self.chain).await {
150                Ok(message) => tracing::info!("Executing token partition creation: {}", message),
151                Err(e) => tracing::error!(
152                    "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
153                    self.chain.chain_id
154                ),
155            }
156        }
157
158        if let Err(e) = self.load_tokens().await {
159            tracing::error!("Error loading tokens from the database: {e}");
160        }
161    }
162
163    /// Connects to the database and loads initial data.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if database seeding, token loading, or block loading fails.
168    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
169        tracing::debug!("Connecting and loading from_block {from_block}");
170
171        if let Err(e) = self.load_tokens().await {
172            tracing::error!("Error loading tokens from the database: {e}");
173        }
174
175        // TODO disable block syncing for now as we don't have timestamps yet configured
176        // if let Err(e) = self.load_blocks(from_block).await {
177        //     log::error!("Error loading blocks from database: {e}");
178        // }
179
180        Ok(())
181    }
182
183    /// Loads tokens from the database into the in-memory cache.
184    async fn load_tokens(&mut self) -> anyhow::Result<()> {
185        if let Some(database) = &self.database {
186            let (tokens, invalid_tokens) = tokio::try_join!(
187                database.load_tokens(self.chain.clone()),
188                database.load_invalid_token_addresses(self.chain.chain_id)
189            )?;
190
191            tracing::info!(
192                "Loading {} valid tokens and {} invalid tokens from cache database",
193                tokens.len(),
194                invalid_tokens.len()
195            );
196
197            self.tokens
198                .extend(tokens.into_iter().map(|token| (token.address, token)));
199            self.invalid_tokens.extend(invalid_tokens);
200        }
201        Ok(())
202    }
203
204    /// Loads DEX exchange pools from the database into the in-memory cache.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if the DEX has not been registered or if database operations fail.
209    pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<()> {
210        if let Some(database) = &self.database {
211            let dex = self
212                .get_dex(dex_id)
213                .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
214            let pool_rows = database
215                .load_pools(self.chain.clone(), &dex_id.to_string())
216                .await?;
217            tracing::info!(
218                "Loading {} pools for DEX {} from cache database",
219                pool_rows.len(),
220                dex_id,
221            );
222
223            for pool_row in pool_rows {
224                let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
225                    token
226                } else {
227                    tracing::error!(
228                        "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
229                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
230                        pool_row.address,
231                        dex_id,
232                        pool_row.token0_address
233                    );
234                    continue;
235                };
236
237                let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
238                    token
239                } else {
240                    tracing::error!(
241                        "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
242                             This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
243                        pool_row.address,
244                        dex_id,
245                        pool_row.token1_address
246                    );
247                    continue;
248                };
249
250                // Construct pool from row data and cached tokens
251                let pool = Pool::new(
252                    self.chain.clone(),
253                    dex.clone(),
254                    pool_row.address,
255                    pool_row.creation_block as u64,
256                    token0.clone(),
257                    token1.clone(),
258                    pool_row.fee.map(|fee| fee as u32),
259                    pool_row
260                        .tick_spacing
261                        .map(|tick_spacing| tick_spacing as u32),
262                    UnixNanos::default(), // TODO use default for now
263                );
264
265                // Add pool to cache
266                self.pools.insert(pool.address, Arc::new(pool));
267            }
268        }
269        Ok(())
270    }
271
272    /// Loads block timestamps from the database starting `from_block` number
273    /// into the in-memory cache.
274    #[allow(dead_code)] // TODO: Under development
275    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
276        if let Some(database) = &self.database {
277            let block_timestamps = database
278                .load_block_timestamps(self.chain.clone(), from_block)
279                .await?;
280
281            // Verify block number sequence consistency
282            if !block_timestamps.is_empty() {
283                let first = block_timestamps.first().unwrap().number;
284                let last = block_timestamps.last().unwrap().number;
285                let expected_len = (last - first + 1) as usize;
286                if block_timestamps.len() != expected_len {
287                    anyhow::bail!(
288                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
289                        block_timestamps.len()
290                    );
291                }
292            }
293
294            if block_timestamps.is_empty() {
295                tracing::info!("No blocks found in database");
296                return Ok(());
297            }
298
299            tracing::info!(
300                "Loading {} blocks timestamps from the cache database with last block number {}",
301                block_timestamps.len(),
302                block_timestamps.last().unwrap().number,
303            );
304            for block in block_timestamps {
305                self.block_timestamps.insert(block.number, block.timestamp);
306            }
307        }
308        Ok(())
309    }
310
311    /// Adds a block to the cache and persists it to the database if available.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if adding the block to the database fails.
316    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
317        if let Some(database) = &self.database {
318            database.add_block(self.chain.chain_id, &block).await?;
319        }
320        self.block_timestamps.insert(block.number, block.timestamp);
321        Ok(())
322    }
323
324    /// Adds multiple blocks to the cache and persists them to the database in batch if available.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if adding the blocks to the database fails.
329    pub async fn add_blocks_batch(
330        &mut self,
331        blocks: Vec<Block>,
332        use_copy_command: bool,
333    ) -> anyhow::Result<()> {
334        if blocks.is_empty() {
335            return Ok(());
336        }
337
338        if let Some(database) = &self.database {
339            if use_copy_command {
340                database
341                    .add_blocks_copy(self.chain.chain_id, &blocks)
342                    .await?;
343            } else {
344                database
345                    .add_blocks_batch(self.chain.chain_id, &blocks)
346                    .await?;
347            }
348        }
349
350        // Update in-memory cache
351        for block in blocks {
352            self.block_timestamps.insert(block.number, block.timestamp);
353        }
354
355        Ok(())
356    }
357
358    /// Adds a DEX to the cache with the specified identifier.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if adding the DEX to the database fails.
363    pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
364        tracing::info!("Adding dex {} to the cache", dex.name);
365
366        if let Some(database) = &self.database {
367            database.add_dex(dex.clone()).await?;
368        }
369
370        self.dexes.insert(dex.name, dex);
371        Ok(())
372    }
373
374    /// Adds a liquidity pool/pair to the cache.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if adding the pool to the database fails.
379    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
380        let pool_address = pool.address;
381        if let Some(database) = &self.database {
382            database.add_pool(&pool).await?;
383        }
384
385        self.pools.insert(pool_address, Arc::new(pool));
386        Ok(())
387    }
388
389    /// Adds multiple pools to the cache and persists them to the database in batch if available.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if adding the pools to the database fails.
394    pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
395        if pools.is_empty() {
396            return Ok(());
397        }
398
399        if let Some(database) = &self.database {
400            database.add_pools_batch(&pools).await?;
401        }
402
403        Ok(())
404    }
405
406    /// Adds a [`Token`] to the cache.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if adding the token to the database fails.
411    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
412        if let Some(database) = &self.database {
413            database.add_token(&token).await?;
414        }
415        self.tokens.insert(token.address, token);
416        Ok(())
417    }
418
419    /// Adds an invalid token address with associated error information to the cache.
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if adding the invalid token to the database fails.
424    pub async fn add_invalid_token(
425        &mut self,
426        address: Address,
427        error_string: &str,
428    ) -> anyhow::Result<()> {
429        if let Some(database) = &self.database {
430            database
431                .add_invalid_token(self.chain.chain_id, &address, error_string)
432                .await?;
433        }
434        self.invalid_tokens.insert(address);
435        Ok(())
436    }
437
438    /// Adds a [`PoolSwap`] to the cache database if available.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if adding the swap to the database fails.
443    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
444        if let Some(database) = &self.database {
445            database.add_swap(self.chain.chain_id, swap).await?;
446        }
447
448        Ok(())
449    }
450
451    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if adding the liquidity update to the database fails.
456    pub async fn add_liquidity_update(
457        &self,
458        liquidity_update: &PoolLiquidityUpdate,
459    ) -> anyhow::Result<()> {
460        if let Some(database) = &self.database {
461            database
462                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
463                .await?;
464        }
465
466        Ok(())
467    }
468
469    /// Adds multiple [`PoolSwap`]s to the cache database in a single batch operation if available.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if adding the swaps to the database fails.
474    pub async fn add_pool_swaps_batch(
475        &self,
476        swaps: &[PoolSwap],
477        use_copy_command: bool,
478    ) -> anyhow::Result<()> {
479        if let Some(database) = &self.database {
480            if use_copy_command {
481                database
482                    .add_pool_swaps_copy(self.chain.chain_id, swaps)
483                    .await?;
484            } else {
485                database
486                    .add_pool_swaps_batch(self.chain.chain_id, swaps)
487                    .await?;
488            }
489        }
490
491        Ok(())
492    }
493
494    /// Adds multiple [`PoolLiquidityUpdate`]s to the cache database in a single batch operation if available.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if adding the liquidity updates to the database fails.
499    pub async fn add_pool_liquidity_updates_batch(
500        &self,
501        updates: &[PoolLiquidityUpdate],
502        use_copy_command: bool,
503    ) -> anyhow::Result<()> {
504        if let Some(database) = &self.database {
505            if use_copy_command {
506                database
507                    .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
508                    .await?;
509            } else {
510                database
511                    .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
512                    .await?;
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Adds a batch of pool fee collect events to the cache.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if adding the fee collects to the database fails.
524    pub async fn add_pool_fee_collects_batch(
525        &self,
526        collects: &[PoolFeeCollect],
527        use_copy_command: bool,
528    ) -> anyhow::Result<()> {
529        if let Some(database) = &self.database {
530            if use_copy_command {
531                database
532                    .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
533                    .await?;
534            } else {
535                database
536                    .add_pool_collects_batch(self.chain.chain_id, collects)
537                    .await?;
538            }
539        }
540
541        Ok(())
542    }
543
544    pub async fn update_pool_initialize_price_tick(
545        &self,
546        initialize_event: &InitializeEvent,
547    ) -> anyhow::Result<()> {
548        if let Some(database) = &self.database {
549            database
550                .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
551                .await?;
552        }
553
554        Ok(())
555    }
556
557    /// Returns a reference to the `DexExtended` associated with the given name.
558    #[must_use]
559    pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
560        self.dexes.get(dex_id).cloned()
561    }
562
563    /// Returns a list of registered `DexType` in the cache.
564    #[must_use]
565    pub fn get_registered_dexes(&self) -> HashSet<DexType> {
566        self.dexes.keys().copied().collect()
567    }
568
569    /// Returns a reference to the pool associated with the given address.
570    #[must_use]
571    pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
572        self.pools.get(address)
573    }
574
575    /// Returns a reference to the `Token` associated with the given address.
576    #[must_use]
577    pub fn get_token(&self, address: &Address) -> Option<&Token> {
578        self.tokens.get(address)
579    }
580
581    /// Checks if a token address is marked as invalid in the cache.
582    ///
583    /// Returns `true` if the address was previously recorded as invalid due to
584    /// validation or processing failures.
585    #[must_use]
586    pub fn is_invalid_token(&self, address: &Address) -> bool {
587        self.invalid_tokens.contains(address)
588    }
589
590    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if the database operation fails.
595    pub async fn update_dex_last_synced_block(
596        &self,
597        dex: &DexType,
598        block_number: u64,
599    ) -> anyhow::Result<()> {
600        if let Some(database) = &self.database {
601            database
602                .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
603                .await
604        } else {
605            Ok(())
606        }
607    }
608
609    pub async fn update_pool_last_synced_block(
610        &self,
611        dex: &DexType,
612        pool_address: &Address,
613        block_number: u64,
614    ) -> anyhow::Result<()> {
615        if let Some(database) = &self.database {
616            database
617                .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
618                .await
619        } else {
620            Ok(())
621        }
622    }
623
624    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the database query fails.
629    pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
630        if let Some(database) = &self.database {
631            database
632                .get_dex_last_synced_block(self.chain.chain_id, dex)
633                .await
634        } else {
635            Ok(None)
636        }
637    }
638
639    pub async fn get_pool_last_synced_block(
640        &self,
641        dex: &DexType,
642        pool_address: &Address,
643    ) -> anyhow::Result<Option<u64>> {
644        if let Some(database) = &self.database {
645            database
646                .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
647                .await
648        } else {
649            Ok(None)
650        }
651    }
652
653    /// Retrieves the maximum block number across all pool event tables for a given pool.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if any of the database queries fail.
658    pub async fn get_pool_event_tables_last_block(
659        &self,
660        pool_address: &Address,
661    ) -> anyhow::Result<Option<u64>> {
662        if let Some(database) = &self.database {
663            let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
664                database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
665                database.get_table_last_block(
666                    self.chain.chain_id,
667                    "pool_liquidity_event",
668                    pool_address
669                ),
670                database.get_table_last_block(
671                    self.chain.chain_id,
672                    "pool_collect_event",
673                    pool_address
674                ),
675            )?;
676
677            let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
678                .into_iter()
679                .filter_map(|x| x)
680                .max();
681            Ok(max_block)
682        } else {
683            Ok(None)
684        }
685    }
686}