1use std::{
23 collections::{BTreeMap, HashMap, HashSet},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31 data::{PoolFeeCollect, PoolFlash},
32 pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
33 tick_map::tick::PoolTick,
34};
35use sqlx::postgres::PgConnectOptions;
36
37use crate::{
38 cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
39 events::initialize::InitializeEvent,
40};
41
42pub mod consistency;
43pub mod copy;
44pub mod database;
45pub mod rows;
46pub mod types;
47
48#[derive(Debug)]
50pub struct BlockchainCache {
51 chain: SharedChain,
53 block_timestamps: BTreeMap<u64, UnixNanos>,
55 dexes: HashMap<DexType, SharedDex>,
57 tokens: HashMap<Address, Token>,
59 invalid_tokens: HashSet<Address>,
61 pools: HashMap<Address, SharedPool>,
63 pub database: Option<BlockchainCacheDatabase>,
65}
66
67impl BlockchainCache {
68 #[must_use]
70 pub fn new(chain: SharedChain) -> Self {
71 Self {
72 chain,
73 dexes: HashMap::new(),
74 tokens: HashMap::new(),
75 invalid_tokens: HashSet::new(),
76 pools: HashMap::new(),
77 block_timestamps: BTreeMap::new(),
78 database: None,
79 }
80 }
81
82 pub async fn get_cache_block_consistency_status(
84 &self,
85 ) -> Option<CachedBlocksConsistencyStatus> {
86 let database = self.database.as_ref()?;
87 database
88 .get_block_consistency_status(&self.chain)
89 .await
90 .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
91 .ok()
92 }
93
94 #[must_use]
96 pub fn min_dex_creation_block(&self) -> Option<u64> {
97 self.dexes
98 .values()
99 .map(|dex| dex.factory_creation_block)
100 .min()
101 }
102
103 #[must_use]
105 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
106 self.block_timestamps.get(&block_number)
107 }
108
109 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
111 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
112 self.database = Some(database);
113 }
114
115 pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
121 if let Some(database) = &self.database {
122 database.toggle_perf_sync_settings(enable).await
123 } else {
124 tracing::warn!("Database not initialized, skipping performance settings toggle");
125 Ok(())
126 }
127 }
128
129 pub async fn initialize_chain(&mut self) {
134 if let Some(database) = &self.database {
136 if let Err(e) = database.seed_chain(&self.chain).await {
137 tracing::error!(
138 "Error seeding chain in database: {e}. Continuing without database cache functionality"
139 );
140 return;
141 }
142 tracing::info!("Chain seeded in the database");
143
144 match database.create_block_partition(&self.chain).await {
145 Ok(message) => tracing::info!("Executing block partition creation: {}", message),
146 Err(e) => tracing::error!(
147 "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
148 self.chain.chain_id
149 ),
150 }
151
152 match database.create_token_partition(&self.chain).await {
153 Ok(message) => tracing::info!("Executing token partition creation: {}", message),
154 Err(e) => tracing::error!(
155 "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
156 self.chain.chain_id
157 ),
158 }
159 }
160
161 if let Err(e) = self.load_tokens().await {
162 tracing::error!("Error loading tokens from the database: {e}");
163 }
164 }
165
166 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
172 tracing::debug!("Connecting and loading from_block {from_block}");
173
174 if let Err(e) = self.load_tokens().await {
175 tracing::error!("Error loading tokens from the database: {e}");
176 }
177
178 Ok(())
184 }
185
186 async fn load_tokens(&mut self) -> anyhow::Result<()> {
188 if let Some(database) = &self.database {
189 let (tokens, invalid_tokens) = tokio::try_join!(
190 database.load_tokens(self.chain.clone()),
191 database.load_invalid_token_addresses(self.chain.chain_id)
192 )?;
193
194 tracing::info!(
195 "Loading {} valid tokens and {} invalid tokens from cache database",
196 tokens.len(),
197 invalid_tokens.len()
198 );
199
200 self.tokens
201 .extend(tokens.into_iter().map(|token| (token.address, token)));
202 self.invalid_tokens.extend(invalid_tokens);
203 }
204 Ok(())
205 }
206
207 pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
215 let mut loaded_pools = Vec::new();
216
217 if let Some(database) = &self.database {
218 let dex = self
219 .get_dex(dex_id)
220 .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
221 let pool_rows = database
222 .load_pools(self.chain.clone(), &dex_id.to_string())
223 .await?;
224 tracing::info!(
225 "Loading {} pools for DEX {} from cache database",
226 pool_rows.len(),
227 dex_id,
228 );
229
230 for pool_row in pool_rows {
231 let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
232 token
233 } else {
234 tracing::error!(
235 "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
236 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
237 pool_row.address,
238 dex_id,
239 pool_row.token0_address
240 );
241 continue;
242 };
243
244 let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
245 token
246 } else {
247 tracing::error!(
248 "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
249 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
250 pool_row.address,
251 dex_id,
252 pool_row.token1_address
253 );
254 continue;
255 };
256
257 let mut pool = Pool::new(
259 self.chain.clone(),
260 dex.clone(),
261 pool_row.address,
262 pool_row.creation_block as u64,
263 token0.clone(),
264 token1.clone(),
265 pool_row.fee.map(|fee| fee as u32),
266 pool_row
267 .tick_spacing
268 .map(|tick_spacing| tick_spacing as u32),
269 UnixNanos::default(), );
271
272 if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96 {
274 if let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse() {
275 pool.initialize(initial_sqrt_price_x96);
276 }
277 }
278
279 loaded_pools.push(pool.clone());
281 self.pools.insert(pool.address, Arc::new(pool));
282 }
283 }
284 Ok(loaded_pools)
285 }
286
287 #[allow(dead_code, reason = "TODO: Under development")]
290 async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
291 if let Some(database) = &self.database {
292 let block_timestamps = database
293 .load_block_timestamps(self.chain.clone(), from_block)
294 .await?;
295
296 if !block_timestamps.is_empty() {
298 let first = block_timestamps.first().unwrap().number;
299 let last = block_timestamps.last().unwrap().number;
300 let expected_len = (last - first + 1) as usize;
301 if block_timestamps.len() != expected_len {
302 anyhow::bail!(
303 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
304 block_timestamps.len()
305 );
306 }
307 }
308
309 if block_timestamps.is_empty() {
310 tracing::info!("No blocks found in database");
311 return Ok(());
312 }
313
314 tracing::info!(
315 "Loading {} blocks timestamps from the cache database with last block number {}",
316 block_timestamps.len(),
317 block_timestamps.last().unwrap().number,
318 );
319 for block in block_timestamps {
320 self.block_timestamps.insert(block.number, block.timestamp);
321 }
322 }
323 Ok(())
324 }
325
326 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
332 if let Some(database) = &self.database {
333 database.add_block(self.chain.chain_id, &block).await?;
334 }
335 self.block_timestamps.insert(block.number, block.timestamp);
336 Ok(())
337 }
338
339 pub async fn add_blocks_batch(
345 &mut self,
346 blocks: Vec<Block>,
347 use_copy_command: bool,
348 ) -> anyhow::Result<()> {
349 if blocks.is_empty() {
350 return Ok(());
351 }
352
353 if let Some(database) = &self.database {
354 if use_copy_command {
355 database
356 .add_blocks_copy(self.chain.chain_id, &blocks)
357 .await?;
358 } else {
359 database
360 .add_blocks_batch(self.chain.chain_id, &blocks)
361 .await?;
362 }
363 }
364
365 for block in blocks {
367 self.block_timestamps.insert(block.number, block.timestamp);
368 }
369
370 Ok(())
371 }
372
373 pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
379 tracing::info!("Adding dex {} to the cache", dex.name);
380
381 if let Some(database) = &self.database {
382 database.add_dex(dex.clone()).await?;
383 }
384
385 self.dexes.insert(dex.name, dex);
386 Ok(())
387 }
388
389 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
395 let pool_address = pool.address;
396 if let Some(database) = &self.database {
397 database.add_pool(&pool).await?;
398 }
399
400 self.pools.insert(pool_address, Arc::new(pool));
401 Ok(())
402 }
403
404 pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
410 if pools.is_empty() {
411 return Ok(());
412 }
413
414 if let Some(database) = &self.database {
415 database.add_pools_batch(&pools).await?;
416 }
417
418 Ok(())
419 }
420
421 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
427 if let Some(database) = &self.database {
428 database.add_token(&token).await?;
429 }
430 self.tokens.insert(token.address, token);
431 Ok(())
432 }
433
434 pub async fn add_invalid_token(
440 &mut self,
441 address: Address,
442 error_string: &str,
443 ) -> anyhow::Result<()> {
444 if let Some(database) = &self.database {
445 database
446 .add_invalid_token(self.chain.chain_id, &address, error_string)
447 .await?;
448 }
449 self.invalid_tokens.insert(address);
450 Ok(())
451 }
452
453 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
459 if let Some(database) = &self.database {
460 database.add_swap(self.chain.chain_id, swap).await?;
461 }
462
463 Ok(())
464 }
465
466 pub async fn add_liquidity_update(
472 &self,
473 liquidity_update: &PoolLiquidityUpdate,
474 ) -> anyhow::Result<()> {
475 if let Some(database) = &self.database {
476 database
477 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
478 .await?;
479 }
480
481 Ok(())
482 }
483
484 pub async fn add_pool_swaps_batch(
490 &self,
491 swaps: &[PoolSwap],
492 use_copy_command: bool,
493 ) -> anyhow::Result<()> {
494 if let Some(database) = &self.database {
495 if use_copy_command {
496 database
497 .add_pool_swaps_copy(self.chain.chain_id, swaps)
498 .await?;
499 } else {
500 database
501 .add_pool_swaps_batch(self.chain.chain_id, swaps)
502 .await?;
503 }
504 }
505
506 Ok(())
507 }
508
509 pub async fn add_pool_liquidity_updates_batch(
515 &self,
516 updates: &[PoolLiquidityUpdate],
517 use_copy_command: bool,
518 ) -> anyhow::Result<()> {
519 if let Some(database) = &self.database {
520 if use_copy_command {
521 database
522 .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
523 .await?;
524 } else {
525 database
526 .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
527 .await?;
528 }
529 }
530
531 Ok(())
532 }
533
534 pub async fn add_pool_fee_collects_batch(
540 &self,
541 collects: &[PoolFeeCollect],
542 use_copy_command: bool,
543 ) -> anyhow::Result<()> {
544 if let Some(database) = &self.database {
545 if use_copy_command {
546 database
547 .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
548 .await?;
549 } else {
550 database
551 .add_pool_collects_batch(self.chain.chain_id, collects)
552 .await?;
553 }
554 }
555
556 Ok(())
557 }
558
559 pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
565 if let Some(database) = &self.database {
566 database
567 .add_pool_flash_batch(self.chain.chain_id, flash_events)
568 .await?;
569 }
570
571 Ok(())
572 }
573
574 pub async fn add_pool_snapshot(
585 &self,
586 pool_address: &Address,
587 snapshot: &PoolSnapshot,
588 ) -> anyhow::Result<()> {
589 if let Some(database) = &self.database {
590 database
592 .add_pool_snapshot(self.chain.chain_id, pool_address, snapshot)
593 .await?;
594
595 let positions: Vec<(Address, PoolPosition)> = snapshot
596 .positions
597 .iter()
598 .map(|pos| (*pool_address, pos.clone()))
599 .collect();
600 if !positions.is_empty() {
601 database
602 .add_pool_positions_batch(
603 self.chain.chain_id,
604 snapshot.block_position.number,
605 snapshot.block_position.transaction_index,
606 snapshot.block_position.log_index,
607 &positions,
608 )
609 .await?;
610 }
611
612 let ticks: Vec<(Address, &PoolTick)> = snapshot
613 .ticks
614 .iter()
615 .map(|tick| (*pool_address, tick))
616 .collect();
617 if !ticks.is_empty() {
618 database
619 .add_pool_ticks_batch(
620 self.chain.chain_id,
621 snapshot.block_position.number,
622 snapshot.block_position.transaction_index,
623 snapshot.block_position.log_index,
624 &ticks,
625 )
626 .await?;
627 }
628 }
629
630 Ok(())
631 }
632
633 pub async fn update_pool_initialize_price_tick(
634 &mut self,
635 initialize_event: &InitializeEvent,
636 ) -> anyhow::Result<()> {
637 if let Some(database) = &self.database {
638 database
639 .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
640 .await?;
641 }
642
643 if let Some(cached_pool) = self.pools.get(&initialize_event.pool_address) {
645 let mut updated_pool = (**cached_pool).clone();
646 updated_pool.initialize(initialize_event.sqrt_price_x96);
647
648 self.pools
649 .insert(initialize_event.pool_address, Arc::new(updated_pool));
650 }
651
652 Ok(())
653 }
654
655 #[must_use]
657 pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
658 self.dexes.get(dex_id).cloned()
659 }
660
661 #[must_use]
663 pub fn get_registered_dexes(&self) -> HashSet<DexType> {
664 self.dexes.keys().copied().collect()
665 }
666
667 #[must_use]
669 pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
670 self.pools.get(address)
671 }
672
673 #[must_use]
675 pub fn get_token(&self, address: &Address) -> Option<&Token> {
676 self.tokens.get(address)
677 }
678
679 #[must_use]
684 pub fn is_invalid_token(&self, address: &Address) -> bool {
685 self.invalid_tokens.contains(address)
686 }
687
688 pub async fn update_dex_last_synced_block(
694 &self,
695 dex: &DexType,
696 block_number: u64,
697 ) -> anyhow::Result<()> {
698 if let Some(database) = &self.database {
699 database
700 .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
701 .await
702 } else {
703 Ok(())
704 }
705 }
706
707 pub async fn update_pool_last_synced_block(
708 &self,
709 dex: &DexType,
710 pool_address: &Address,
711 block_number: u64,
712 ) -> anyhow::Result<()> {
713 if let Some(database) = &self.database {
714 database
715 .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
716 .await
717 } else {
718 Ok(())
719 }
720 }
721
722 pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
728 if let Some(database) = &self.database {
729 database
730 .get_dex_last_synced_block(self.chain.chain_id, dex)
731 .await
732 } else {
733 Ok(None)
734 }
735 }
736
737 pub async fn get_pool_last_synced_block(
738 &self,
739 dex: &DexType,
740 pool_address: &Address,
741 ) -> anyhow::Result<Option<u64>> {
742 if let Some(database) = &self.database {
743 database
744 .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
745 .await
746 } else {
747 Ok(None)
748 }
749 }
750
751 pub async fn get_pool_event_tables_last_block(
757 &self,
758 pool_address: &Address,
759 ) -> anyhow::Result<Option<u64>> {
760 if let Some(database) = &self.database {
761 let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
762 database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
763 database.get_table_last_block(
764 self.chain.chain_id,
765 "pool_liquidity_event",
766 pool_address
767 ),
768 database.get_table_last_block(
769 self.chain.chain_id,
770 "pool_collect_event",
771 pool_address
772 ),
773 )?;
774
775 let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
776 .into_iter()
777 .filter_map(|x| x)
778 .max();
779 Ok(max_block)
780 } else {
781 Ok(None)
782 }
783 }
784}