1use std::{
23 collections::{BTreeMap, HashMap, HashSet},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31 data::{PoolFeeCollect, PoolFlash},
32 pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
33 tick_map::tick::PoolTick,
34};
35use sqlx::postgres::PgConnectOptions;
36
37use crate::{
38 cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
39 events::initialize::InitializeEvent,
40};
41
42pub mod consistency;
43pub mod copy;
44pub mod database;
45pub mod rows;
46pub mod types;
47
48#[derive(Debug)]
50pub struct BlockchainCache {
51 chain: SharedChain,
53 block_timestamps: BTreeMap<u64, UnixNanos>,
55 dexes: HashMap<DexType, SharedDex>,
57 tokens: HashMap<Address, Token>,
59 invalid_tokens: HashSet<Address>,
61 pools: HashMap<Address, SharedPool>,
63 pub database: Option<BlockchainCacheDatabase>,
65}
66
67impl BlockchainCache {
68 #[must_use]
70 pub fn new(chain: SharedChain) -> Self {
71 Self {
72 chain,
73 dexes: HashMap::new(),
74 tokens: HashMap::new(),
75 invalid_tokens: HashSet::new(),
76 pools: HashMap::new(),
77 block_timestamps: BTreeMap::new(),
78 database: None,
79 }
80 }
81
82 pub async fn get_cache_block_consistency_status(
84 &self,
85 ) -> Option<CachedBlocksConsistencyStatus> {
86 let database = self.database.as_ref()?;
87 database
88 .get_block_consistency_status(&self.chain)
89 .await
90 .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
91 .ok()
92 }
93
94 #[must_use]
96 pub fn min_dex_creation_block(&self) -> Option<u64> {
97 self.dexes
98 .values()
99 .map(|dex| dex.factory_creation_block)
100 .min()
101 }
102
103 #[must_use]
105 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
106 self.block_timestamps.get(&block_number)
107 }
108
109 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
111 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
112 self.database = Some(database);
113 }
114
115 pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
121 if let Some(database) = &self.database {
122 database.toggle_perf_sync_settings(enable).await
123 } else {
124 tracing::warn!("Database not initialized, skipping performance settings toggle");
125 Ok(())
126 }
127 }
128
129 pub async fn initialize_chain(&mut self) {
134 if let Some(database) = &self.database {
136 if let Err(e) = database.seed_chain(&self.chain).await {
137 tracing::error!(
138 "Error seeding chain in database: {e}. Continuing without database cache functionality"
139 );
140 return;
141 }
142 tracing::info!("Chain seeded in the database");
143
144 match database.create_block_partition(&self.chain).await {
145 Ok(message) => tracing::info!("Executing block partition creation: {}", message),
146 Err(e) => tracing::error!(
147 "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
148 self.chain.chain_id
149 ),
150 }
151
152 match database.create_token_partition(&self.chain).await {
153 Ok(message) => tracing::info!("Executing token partition creation: {}", message),
154 Err(e) => tracing::error!(
155 "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
156 self.chain.chain_id
157 ),
158 }
159 }
160
161 if let Err(e) = self.load_tokens().await {
162 tracing::error!("Error loading tokens from the database: {e}");
163 }
164 }
165
166 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
172 tracing::debug!("Connecting and loading from_block {from_block}");
173
174 if let Err(e) = self.load_tokens().await {
175 tracing::error!("Error loading tokens from the database: {e}");
176 }
177
178 Ok(())
184 }
185
186 async fn load_tokens(&mut self) -> anyhow::Result<()> {
188 if let Some(database) = &self.database {
189 let (tokens, invalid_tokens) = tokio::try_join!(
190 database.load_tokens(self.chain.clone()),
191 database.load_invalid_token_addresses(self.chain.chain_id)
192 )?;
193
194 tracing::info!(
195 "Loading {} valid tokens and {} invalid tokens from cache database",
196 tokens.len(),
197 invalid_tokens.len()
198 );
199
200 self.tokens
201 .extend(tokens.into_iter().map(|token| (token.address, token)));
202 self.invalid_tokens.extend(invalid_tokens);
203 }
204 Ok(())
205 }
206
207 pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
215 let mut loaded_pools = Vec::new();
216
217 if let Some(database) = &self.database {
218 let dex = self
219 .get_dex(dex_id)
220 .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
221 let pool_rows = database
222 .load_pools(self.chain.clone(), &dex_id.to_string())
223 .await?;
224 tracing::info!(
225 "Loading {} pools for DEX {} from cache database",
226 pool_rows.len(),
227 dex_id,
228 );
229
230 for pool_row in pool_rows {
231 let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
232 token
233 } else {
234 tracing::error!(
235 "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
236 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
237 pool_row.address,
238 dex_id,
239 pool_row.token0_address
240 );
241 continue;
242 };
243
244 let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
245 token
246 } else {
247 tracing::error!(
248 "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
249 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
250 pool_row.address,
251 dex_id,
252 pool_row.token1_address
253 );
254 continue;
255 };
256
257 let mut pool = Pool::new(
259 self.chain.clone(),
260 dex.clone(),
261 pool_row.address,
262 pool_row.creation_block as u64,
263 token0.clone(),
264 token1.clone(),
265 pool_row.fee.map(|fee| fee as u32),
266 pool_row
267 .tick_spacing
268 .map(|tick_spacing| tick_spacing as u32),
269 UnixNanos::default(), );
271
272 if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96 {
274 if let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse() {
275 pool.initialize(initial_sqrt_price_x96);
276 }
277 }
278
279 loaded_pools.push(pool.clone());
281 self.pools.insert(pool.address, Arc::new(pool));
282 }
283 }
284 Ok(loaded_pools)
285 }
286
287 #[allow(dead_code, reason = "TODO: Under development")]
290 async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
291 if let Some(database) = &self.database {
292 let block_timestamps = database
293 .load_block_timestamps(self.chain.clone(), from_block)
294 .await?;
295
296 if !block_timestamps.is_empty() {
298 let first = block_timestamps.first().unwrap().number;
299 let last = block_timestamps.last().unwrap().number;
300 let expected_len = (last - first + 1) as usize;
301 if block_timestamps.len() != expected_len {
302 anyhow::bail!(
303 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
304 block_timestamps.len()
305 );
306 }
307 }
308
309 if block_timestamps.is_empty() {
310 tracing::info!("No blocks found in database");
311 return Ok(());
312 }
313
314 tracing::info!(
315 "Loading {} blocks timestamps from the cache database with last block number {}",
316 block_timestamps.len(),
317 block_timestamps.last().unwrap().number,
318 );
319 for block in block_timestamps {
320 self.block_timestamps.insert(block.number, block.timestamp);
321 }
322 }
323 Ok(())
324 }
325
326 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
332 if let Some(database) = &self.database {
333 database.add_block(self.chain.chain_id, &block).await?;
334 }
335 self.block_timestamps.insert(block.number, block.timestamp);
336 Ok(())
337 }
338
339 pub async fn add_blocks_batch(
345 &mut self,
346 blocks: Vec<Block>,
347 use_copy_command: bool,
348 ) -> anyhow::Result<()> {
349 if blocks.is_empty() {
350 return Ok(());
351 }
352
353 if let Some(database) = &self.database {
354 if use_copy_command {
355 database
356 .add_blocks_copy(self.chain.chain_id, &blocks)
357 .await?;
358 } else {
359 database
360 .add_blocks_batch(self.chain.chain_id, &blocks)
361 .await?;
362 }
363 }
364
365 for block in blocks {
367 self.block_timestamps.insert(block.number, block.timestamp);
368 }
369
370 Ok(())
371 }
372
373 pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
379 tracing::info!("Adding dex {} to the cache", dex.name);
380
381 if let Some(database) = &self.database {
382 database.add_dex(dex.clone()).await?;
383 }
384
385 self.dexes.insert(dex.name, dex);
386 Ok(())
387 }
388
389 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
395 let pool_address = pool.address;
396 if let Some(database) = &self.database {
397 database.add_pool(&pool).await?;
398 }
399
400 self.pools.insert(pool_address, Arc::new(pool));
401 Ok(())
402 }
403
404 pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
410 if pools.is_empty() {
411 return Ok(());
412 }
413
414 if let Some(database) = &self.database {
415 database.add_pools_copy(self.chain.chain_id, &pools).await?;
416 }
417 self.pools
418 .extend(pools.into_iter().map(|pool| (pool.address, Arc::new(pool))));
419
420 Ok(())
421 }
422
423 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
429 if let Some(database) = &self.database {
430 database.add_token(&token).await?;
431 }
432 self.tokens.insert(token.address, token);
433 Ok(())
434 }
435
436 pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
442 if tokens.is_empty() {
443 return Ok(());
444 }
445
446 if let Some(database) = &self.database {
447 database
448 .add_tokens_copy(self.chain.chain_id, &tokens)
449 .await?;
450 }
451
452 self.tokens
453 .extend(tokens.into_iter().map(|token| (token.address, token)));
454
455 Ok(())
456 }
457
458 pub fn insert_token_in_memory(&mut self, token: Token) {
460 self.tokens.insert(token.address, token);
461 }
462
463 pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
465 self.invalid_tokens.insert(address);
466 }
467
468 pub async fn add_invalid_token(
474 &mut self,
475 address: Address,
476 error_string: &str,
477 ) -> anyhow::Result<()> {
478 if let Some(database) = &self.database {
479 database
480 .add_invalid_token(self.chain.chain_id, &address, error_string)
481 .await?;
482 }
483 self.invalid_tokens.insert(address);
484 Ok(())
485 }
486
487 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
493 if let Some(database) = &self.database {
494 database.add_swap(self.chain.chain_id, swap).await?;
495 }
496
497 Ok(())
498 }
499
500 pub async fn add_liquidity_update(
506 &self,
507 liquidity_update: &PoolLiquidityUpdate,
508 ) -> anyhow::Result<()> {
509 if let Some(database) = &self.database {
510 database
511 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
512 .await?;
513 }
514
515 Ok(())
516 }
517
518 pub async fn add_pool_swaps_batch(
524 &self,
525 swaps: &[PoolSwap],
526 use_copy_command: bool,
527 ) -> anyhow::Result<()> {
528 if let Some(database) = &self.database {
529 if use_copy_command {
530 database
531 .add_pool_swaps_copy(self.chain.chain_id, swaps)
532 .await?;
533 } else {
534 database
535 .add_pool_swaps_batch(self.chain.chain_id, swaps)
536 .await?;
537 }
538 }
539
540 Ok(())
541 }
542
543 pub async fn add_pool_liquidity_updates_batch(
549 &self,
550 updates: &[PoolLiquidityUpdate],
551 use_copy_command: bool,
552 ) -> anyhow::Result<()> {
553 if let Some(database) = &self.database {
554 if use_copy_command {
555 database
556 .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
557 .await?;
558 } else {
559 database
560 .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
561 .await?;
562 }
563 }
564
565 Ok(())
566 }
567
568 pub async fn add_pool_fee_collects_batch(
574 &self,
575 collects: &[PoolFeeCollect],
576 use_copy_command: bool,
577 ) -> anyhow::Result<()> {
578 if let Some(database) = &self.database {
579 if use_copy_command {
580 database
581 .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
582 .await?;
583 } else {
584 database
585 .add_pool_collects_batch(self.chain.chain_id, collects)
586 .await?;
587 }
588 }
589
590 Ok(())
591 }
592
593 pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
599 if let Some(database) = &self.database {
600 database
601 .add_pool_flash_batch(self.chain.chain_id, flash_events)
602 .await?;
603 }
604
605 Ok(())
606 }
607
608 pub async fn add_pool_snapshot(
619 &self,
620 pool_address: &Address,
621 snapshot: &PoolSnapshot,
622 ) -> anyhow::Result<()> {
623 if let Some(database) = &self.database {
624 database
626 .add_pool_snapshot(self.chain.chain_id, pool_address, snapshot)
627 .await?;
628
629 let positions: Vec<(Address, PoolPosition)> = snapshot
630 .positions
631 .iter()
632 .map(|pos| (*pool_address, pos.clone()))
633 .collect();
634 if !positions.is_empty() {
635 database
636 .add_pool_positions_batch(
637 self.chain.chain_id,
638 snapshot.block_position.number,
639 snapshot.block_position.transaction_index,
640 snapshot.block_position.log_index,
641 &positions,
642 )
643 .await?;
644 }
645
646 let ticks: Vec<(Address, &PoolTick)> = snapshot
647 .ticks
648 .iter()
649 .map(|tick| (*pool_address, tick))
650 .collect();
651 if !ticks.is_empty() {
652 database
653 .add_pool_ticks_batch(
654 self.chain.chain_id,
655 snapshot.block_position.number,
656 snapshot.block_position.transaction_index,
657 snapshot.block_position.log_index,
658 &ticks,
659 )
660 .await?;
661 }
662 }
663
664 Ok(())
665 }
666
667 pub async fn update_pool_initialize_price_tick(
668 &mut self,
669 initialize_event: &InitializeEvent,
670 ) -> anyhow::Result<()> {
671 if let Some(database) = &self.database {
672 database
673 .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
674 .await?;
675 }
676
677 if let Some(cached_pool) = self.pools.get(&initialize_event.pool_address) {
679 let mut updated_pool = (**cached_pool).clone();
680 updated_pool.initialize(initialize_event.sqrt_price_x96);
681
682 self.pools
683 .insert(initialize_event.pool_address, Arc::new(updated_pool));
684 }
685
686 Ok(())
687 }
688
689 #[must_use]
691 pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
692 self.dexes.get(dex_id).cloned()
693 }
694
695 #[must_use]
697 pub fn get_registered_dexes(&self) -> HashSet<DexType> {
698 self.dexes.keys().copied().collect()
699 }
700
701 #[must_use]
703 pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
704 self.pools.get(address)
705 }
706
707 #[must_use]
709 pub fn get_token(&self, address: &Address) -> Option<&Token> {
710 self.tokens.get(address)
711 }
712
713 #[must_use]
718 pub fn is_invalid_token(&self, address: &Address) -> bool {
719 self.invalid_tokens.contains(address)
720 }
721
722 pub async fn update_dex_last_synced_block(
728 &self,
729 dex: &DexType,
730 block_number: u64,
731 ) -> anyhow::Result<()> {
732 if let Some(database) = &self.database {
733 database
734 .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
735 .await
736 } else {
737 Ok(())
738 }
739 }
740
741 pub async fn update_pool_last_synced_block(
742 &self,
743 dex: &DexType,
744 pool_address: &Address,
745 block_number: u64,
746 ) -> anyhow::Result<()> {
747 if let Some(database) = &self.database {
748 database
749 .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
750 .await
751 } else {
752 Ok(())
753 }
754 }
755
756 pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
762 if let Some(database) = &self.database {
763 database
764 .get_dex_last_synced_block(self.chain.chain_id, dex)
765 .await
766 } else {
767 Ok(None)
768 }
769 }
770
771 pub async fn get_pool_last_synced_block(
772 &self,
773 dex: &DexType,
774 pool_address: &Address,
775 ) -> anyhow::Result<Option<u64>> {
776 if let Some(database) = &self.database {
777 database
778 .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
779 .await
780 } else {
781 Ok(None)
782 }
783 }
784
785 pub async fn get_pool_event_tables_last_block(
791 &self,
792 pool_address: &Address,
793 ) -> anyhow::Result<Option<u64>> {
794 if let Some(database) = &self.database {
795 let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
796 database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
797 database.get_table_last_block(
798 self.chain.chain_id,
799 "pool_liquidity_event",
800 pool_address
801 ),
802 database.get_table_last_block(
803 self.chain.chain_id,
804 "pool_collect_event",
805 pool_address
806 ),
807 )?;
808
809 let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
810 .into_iter()
811 .filter_map(|x| x)
812 .max();
813 Ok(max_block)
814 } else {
815 Ok(None)
816 }
817 }
818}