1use std::{
23 collections::{BTreeMap, HashMap, HashSet},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
31 SharedPool, Token,
32 data::{PoolFeeCollect, PoolFlash},
33 pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
34 tick_map::tick::PoolTick,
35};
36use sqlx::postgres::PgConnectOptions;
37
38use crate::{
39 cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
40 events::initialize::InitializeEvent,
41};
42
43pub mod consistency;
44pub mod copy;
45pub mod database;
46pub mod rows;
47pub mod types;
48
49#[derive(Debug)]
51pub struct BlockchainCache {
52 chain: SharedChain,
54 block_timestamps: BTreeMap<u64, UnixNanos>,
56 dexes: HashMap<DexType, SharedDex>,
58 tokens: HashMap<Address, Token>,
60 invalid_tokens: HashSet<Address>,
62 pools: HashMap<PoolIdentifier, SharedPool>,
64 pub database: Option<BlockchainCacheDatabase>,
66}
67
68impl BlockchainCache {
69 #[must_use]
71 pub fn new(chain: SharedChain) -> Self {
72 Self {
73 chain,
74 dexes: HashMap::new(),
75 tokens: HashMap::new(),
76 invalid_tokens: HashSet::new(),
77 pools: HashMap::new(),
78 block_timestamps: BTreeMap::new(),
79 database: None,
80 }
81 }
82
83 pub async fn get_cache_block_consistency_status(
85 &self,
86 ) -> Option<CachedBlocksConsistencyStatus> {
87 let database = self.database.as_ref()?;
88 database
89 .get_block_consistency_status(&self.chain)
90 .await
91 .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
92 .ok()
93 }
94
95 #[must_use]
97 pub fn min_dex_creation_block(&self) -> Option<u64> {
98 self.dexes
99 .values()
100 .map(|dex| dex.factory_creation_block)
101 .min()
102 }
103
104 #[must_use]
106 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
107 self.block_timestamps.get(&block_number)
108 }
109
110 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
112 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
113 self.database = Some(database);
114 }
115
116 pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
122 if let Some(database) = &self.database {
123 database.toggle_perf_sync_settings(enable).await
124 } else {
125 tracing::warn!("Database not initialized, skipping performance settings toggle");
126 Ok(())
127 }
128 }
129
130 pub async fn initialize_chain(&mut self) {
135 if let Some(database) = &self.database {
137 if let Err(e) = database.seed_chain(&self.chain).await {
138 tracing::error!(
139 "Error seeding chain in database: {e}. Continuing without database cache functionality"
140 );
141 return;
142 }
143 tracing::info!("Chain seeded in the database");
144
145 match database.create_block_partition(&self.chain).await {
146 Ok(message) => tracing::info!("Executing block partition creation: {}", message),
147 Err(e) => tracing::error!(
148 "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
149 self.chain.chain_id
150 ),
151 }
152
153 match database.create_token_partition(&self.chain).await {
154 Ok(message) => tracing::info!("Executing token partition creation: {}", message),
155 Err(e) => tracing::error!(
156 "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
157 self.chain.chain_id
158 ),
159 }
160 }
161
162 if let Err(e) = self.load_tokens().await {
163 tracing::error!("Error loading tokens from the database: {e}");
164 }
165 }
166
167 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
173 tracing::debug!("Connecting and loading from_block {from_block}");
174
175 if let Err(e) = self.load_tokens().await {
176 tracing::error!("Error loading tokens from the database: {e}");
177 }
178
179 Ok(())
185 }
186
187 async fn load_tokens(&mut self) -> anyhow::Result<()> {
189 if let Some(database) = &self.database {
190 let (tokens, invalid_tokens) = tokio::try_join!(
191 database.load_tokens(self.chain.clone()),
192 database.load_invalid_token_addresses(self.chain.chain_id)
193 )?;
194
195 tracing::info!(
196 "Loading {} valid tokens and {} invalid tokens from cache database",
197 tokens.len(),
198 invalid_tokens.len()
199 );
200
201 self.tokens
202 .extend(tokens.into_iter().map(|token| (token.address, token)));
203 self.invalid_tokens.extend(invalid_tokens);
204 }
205 Ok(())
206 }
207
208 pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
216 let mut loaded_pools = Vec::new();
217
218 if let Some(database) = &self.database {
219 let dex = self
220 .get_dex(dex_id)
221 .ok_or_else(|| anyhow::anyhow!("DEX {dex_id:?} has not been registered"))?;
222 let pool_rows = database
223 .load_pools(self.chain.clone(), &dex_id.to_string())
224 .await?;
225 tracing::info!(
226 "Loading {} pools for DEX {} from cache database",
227 pool_rows.len(),
228 dex_id,
229 );
230
231 for pool_row in pool_rows {
232 let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
233 token
234 } else {
235 tracing::error!(
236 "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
237 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
238 pool_row.address,
239 dex_id,
240 pool_row.token0_address
241 );
242 continue;
243 };
244
245 let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
246 token
247 } else {
248 tracing::error!(
249 "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
250 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
251 pool_row.address,
252 dex_id,
253 pool_row.token1_address
254 );
255 continue;
256 };
257
258 let Some(pool_identifier) = pool_row.pool_identifier.parse().ok() else {
260 tracing::error!(
261 "Invalid pool identifier '{}' in database for pool {}, skipping",
262 pool_row.pool_identifier,
263 pool_row.address
264 );
265 continue;
266 };
267 let mut pool = Pool::new(
268 self.chain.clone(),
269 dex.clone(),
270 pool_row.address,
271 pool_identifier,
272 pool_row.creation_block as u64,
273 token0.clone(),
274 token1.clone(),
275 pool_row.fee.map(|fee| fee as u32),
276 pool_row
277 .tick_spacing
278 .map(|tick_spacing| tick_spacing as u32),
279 UnixNanos::default(), );
281
282 if let Some(ref hook_address_str) = pool_row.hook_address
284 && let Ok(hooks) = hook_address_str.parse()
285 {
286 pool.set_hooks(hooks);
287 }
288
289 if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96
291 && let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse()
292 && let Some(initial_tick) = pool_row.initial_tick
293 {
294 pool.initialize(initial_sqrt_price_x96, initial_tick);
295 }
296
297 loaded_pools.push(pool.clone());
299 self.pools.insert(pool.pool_identifier, Arc::new(pool));
300 }
301 }
302 Ok(loaded_pools)
303 }
304
305 #[allow(dead_code)]
308 async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
309 if let Some(database) = &self.database {
310 let block_timestamps = database
311 .load_block_timestamps(self.chain.clone(), from_block)
312 .await?;
313
314 if !block_timestamps.is_empty() {
316 let first = block_timestamps.first().unwrap().number;
317 let last = block_timestamps.last().unwrap().number;
318 let expected_len = (last - first + 1) as usize;
319 if block_timestamps.len() != expected_len {
320 anyhow::bail!(
321 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
322 block_timestamps.len()
323 );
324 }
325 }
326
327 if block_timestamps.is_empty() {
328 tracing::info!("No blocks found in database");
329 return Ok(());
330 }
331
332 tracing::info!(
333 "Loading {} blocks timestamps from the cache database with last block number {}",
334 block_timestamps.len(),
335 block_timestamps.last().unwrap().number,
336 );
337 for block in block_timestamps {
338 self.block_timestamps.insert(block.number, block.timestamp);
339 }
340 }
341 Ok(())
342 }
343
344 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
350 if let Some(database) = &self.database {
351 database.add_block(self.chain.chain_id, &block).await?;
352 }
353 self.block_timestamps.insert(block.number, block.timestamp);
354 Ok(())
355 }
356
357 pub async fn add_blocks_batch(
363 &mut self,
364 blocks: Vec<Block>,
365 use_copy_command: bool,
366 ) -> anyhow::Result<()> {
367 if blocks.is_empty() {
368 return Ok(());
369 }
370
371 if let Some(database) = &self.database {
372 if use_copy_command {
373 database
374 .add_blocks_copy(self.chain.chain_id, &blocks)
375 .await?;
376 } else {
377 database
378 .add_blocks_batch(self.chain.chain_id, &blocks)
379 .await?;
380 }
381 }
382
383 for block in blocks {
385 self.block_timestamps.insert(block.number, block.timestamp);
386 }
387
388 Ok(())
389 }
390
391 pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
397 tracing::info!("Adding dex {} to the cache", dex.name);
398
399 if let Some(database) = &self.database {
400 database.add_dex(dex.clone()).await?;
401 }
402
403 self.dexes.insert(dex.name, dex);
404 Ok(())
405 }
406
407 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
413 if let Some(database) = &self.database {
414 database.add_pool(&pool).await?;
415 }
416
417 self.pools.insert(pool.pool_identifier, Arc::new(pool));
418 Ok(())
419 }
420
421 pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
427 if pools.is_empty() {
428 return Ok(());
429 }
430
431 if let Some(database) = &self.database {
432 database.add_pools_copy(self.chain.chain_id, &pools).await?;
433 }
434 self.pools.extend(
435 pools
436 .into_iter()
437 .map(|pool| (pool.pool_identifier, Arc::new(pool))),
438 );
439
440 Ok(())
441 }
442
443 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
449 if let Some(database) = &self.database {
450 database.add_token(&token).await?;
451 }
452 self.tokens.insert(token.address, token);
453 Ok(())
454 }
455
456 pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
462 if tokens.is_empty() {
463 return Ok(());
464 }
465
466 if let Some(database) = &self.database {
467 database
468 .add_tokens_copy(self.chain.chain_id, &tokens)
469 .await?;
470 }
471
472 self.tokens
473 .extend(tokens.into_iter().map(|token| (token.address, token)));
474
475 Ok(())
476 }
477
478 pub fn insert_token_in_memory(&mut self, token: Token) {
480 self.tokens.insert(token.address, token);
481 }
482
483 pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
485 self.invalid_tokens.insert(address);
486 }
487
488 pub async fn add_invalid_token(
494 &mut self,
495 address: Address,
496 error_string: &str,
497 ) -> anyhow::Result<()> {
498 if let Some(database) = &self.database {
499 database
500 .add_invalid_token(self.chain.chain_id, &address, error_string)
501 .await?;
502 }
503 self.invalid_tokens.insert(address);
504 Ok(())
505 }
506
507 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
513 if let Some(database) = &self.database {
514 database.add_swap(self.chain.chain_id, swap).await?;
515 }
516
517 Ok(())
518 }
519
520 pub async fn add_liquidity_update(
526 &self,
527 liquidity_update: &PoolLiquidityUpdate,
528 ) -> anyhow::Result<()> {
529 if let Some(database) = &self.database {
530 database
531 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
532 .await?;
533 }
534
535 Ok(())
536 }
537
538 pub async fn add_pool_swaps_batch(
544 &self,
545 swaps: &[PoolSwap],
546 use_copy_command: bool,
547 ) -> anyhow::Result<()> {
548 if let Some(database) = &self.database {
549 if use_copy_command {
550 database
551 .add_pool_swaps_copy(self.chain.chain_id, swaps)
552 .await?;
553 } else {
554 database
555 .add_pool_swaps_batch(self.chain.chain_id, swaps)
556 .await?;
557 }
558 }
559
560 Ok(())
561 }
562
563 pub async fn add_pool_liquidity_updates_batch(
569 &self,
570 updates: &[PoolLiquidityUpdate],
571 use_copy_command: bool,
572 ) -> anyhow::Result<()> {
573 if let Some(database) = &self.database {
574 if use_copy_command {
575 database
576 .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
577 .await?;
578 } else {
579 database
580 .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
581 .await?;
582 }
583 }
584
585 Ok(())
586 }
587
588 pub async fn add_pool_fee_collects_batch(
594 &self,
595 collects: &[PoolFeeCollect],
596 use_copy_command: bool,
597 ) -> anyhow::Result<()> {
598 if let Some(database) = &self.database {
599 if use_copy_command {
600 database
601 .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
602 .await?;
603 } else {
604 database
605 .add_pool_collects_batch(self.chain.chain_id, collects)
606 .await?;
607 }
608 }
609
610 Ok(())
611 }
612
613 pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
619 if let Some(database) = &self.database {
620 database
621 .add_pool_flash_batch(self.chain.chain_id, flash_events)
622 .await?;
623 }
624
625 Ok(())
626 }
627
628 pub async fn add_pool_snapshot(
639 &self,
640 dex: &DexType,
641 pool_identifier: &PoolIdentifier,
642 snapshot: &PoolSnapshot,
643 ) -> anyhow::Result<()> {
644 if let Some(database) = &self.database {
645 database
647 .add_pool_snapshot(self.chain.chain_id, dex, pool_identifier, snapshot)
648 .await?;
649
650 let positions: Vec<(PoolIdentifier, PoolPosition)> = snapshot
651 .positions
652 .iter()
653 .map(|pos| (*pool_identifier, pos.clone()))
654 .collect();
655 if !positions.is_empty() {
656 database
657 .add_pool_positions_batch(
658 self.chain.chain_id,
659 snapshot.block_position.number,
660 snapshot.block_position.transaction_index,
661 snapshot.block_position.log_index,
662 &positions,
663 )
664 .await?;
665 }
666
667 let ticks: Vec<(PoolIdentifier, &PoolTick)> = snapshot
668 .ticks
669 .iter()
670 .map(|tick| (*pool_identifier, tick))
671 .collect();
672 if !ticks.is_empty() {
673 database
674 .add_pool_ticks_batch(
675 self.chain.chain_id,
676 snapshot.block_position.number,
677 snapshot.block_position.transaction_index,
678 snapshot.block_position.log_index,
679 &ticks,
680 )
681 .await?;
682 }
683 }
684
685 Ok(())
686 }
687
688 pub async fn update_pool_initialize_price_tick(
694 &mut self,
695 initialize_event: &InitializeEvent,
696 ) -> anyhow::Result<()> {
697 if let Some(database) = &self.database {
698 database
699 .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
700 .await?;
701 }
702
703 let pool_identifier = initialize_event.pool_identifier;
705 if let Some(cached_pool) = self.pools.get(&pool_identifier) {
706 let mut updated_pool = (**cached_pool).clone();
707 updated_pool.initialize(initialize_event.sqrt_price_x96, initialize_event.tick);
708
709 self.pools.insert(pool_identifier, Arc::new(updated_pool));
710 }
711
712 Ok(())
713 }
714
715 #[must_use]
717 pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
718 self.dexes.get(dex_id).cloned()
719 }
720
721 #[must_use]
723 pub fn get_registered_dexes(&self) -> HashSet<DexType> {
724 self.dexes.keys().copied().collect()
725 }
726
727 #[must_use]
729 pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> Option<&SharedPool> {
730 self.pools.get(pool_identifier)
731 }
732
733 #[must_use]
735 pub fn get_token(&self, address: &Address) -> Option<&Token> {
736 self.tokens.get(address)
737 }
738
739 #[must_use]
744 pub fn is_invalid_token(&self, address: &Address) -> bool {
745 self.invalid_tokens.contains(address)
746 }
747
748 pub async fn update_dex_last_synced_block(
754 &self,
755 dex: &DexType,
756 block_number: u64,
757 ) -> anyhow::Result<()> {
758 if let Some(database) = &self.database {
759 database
760 .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
761 .await
762 } else {
763 Ok(())
764 }
765 }
766
767 pub async fn update_pool_last_synced_block(
773 &self,
774 dex: &DexType,
775 pool_identifier: &PoolIdentifier,
776 block_number: u64,
777 ) -> anyhow::Result<()> {
778 if let Some(database) = &self.database {
779 database
780 .update_pool_last_synced_block(
781 self.chain.chain_id,
782 dex,
783 pool_identifier,
784 block_number,
785 )
786 .await
787 } else {
788 Ok(())
789 }
790 }
791
792 pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
798 if let Some(database) = &self.database {
799 database
800 .get_dex_last_synced_block(self.chain.chain_id, dex)
801 .await
802 } else {
803 Ok(None)
804 }
805 }
806
807 pub async fn get_pool_last_synced_block(
813 &self,
814 dex: &DexType,
815 pool_identifier: &PoolIdentifier,
816 ) -> anyhow::Result<Option<u64>> {
817 if let Some(database) = &self.database {
818 database
819 .get_pool_last_synced_block(self.chain.chain_id, dex, pool_identifier)
820 .await
821 } else {
822 Ok(None)
823 }
824 }
825
826 pub async fn get_pool_event_tables_last_block(
832 &self,
833 pool_identifier: &PoolIdentifier,
834 ) -> anyhow::Result<Option<u64>> {
835 if let Some(database) = &self.database {
836 let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
837 database.get_table_last_block(
838 self.chain.chain_id,
839 "pool_swap_event",
840 pool_identifier
841 ),
842 database.get_table_last_block(
843 self.chain.chain_id,
844 "pool_liquidity_event",
845 pool_identifier
846 ),
847 database.get_table_last_block(
848 self.chain.chain_id,
849 "pool_collect_event",
850 pool_identifier
851 ),
852 )?;
853
854 let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
855 .into_iter()
856 .flatten()
857 .max();
858 Ok(max_block)
859 } else {
860 Ok(None)
861 }
862 }
863}