1use std::{
23 collections::{BTreeMap, HashMap, HashSet},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, SharedPool, Token,
31 data::PoolFeeCollect,
32};
33use sqlx::postgres::PgConnectOptions;
34
35use crate::{
36 cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
37 events::initialize::InitializeEvent,
38};
39
40pub mod consistency;
41pub mod copy;
42pub mod database;
43pub mod rows;
44
45#[derive(Debug)]
47pub struct BlockchainCache {
48 chain: SharedChain,
50 block_timestamps: BTreeMap<u64, UnixNanos>,
52 dexes: HashMap<DexType, SharedDex>,
54 tokens: HashMap<Address, Token>,
56 invalid_tokens: HashSet<Address>,
58 pools: HashMap<Address, SharedPool>,
60 database: Option<BlockchainCacheDatabase>,
62}
63
64impl BlockchainCache {
65 #[must_use]
67 pub fn new(chain: SharedChain) -> Self {
68 Self {
69 chain,
70 dexes: HashMap::new(),
71 tokens: HashMap::new(),
72 invalid_tokens: HashSet::new(),
73 pools: HashMap::new(),
74 block_timestamps: BTreeMap::new(),
75 database: None,
76 }
77 }
78
79 pub async fn get_cache_block_consistency_status(
81 &self,
82 ) -> Option<CachedBlocksConsistencyStatus> {
83 let database = self.database.as_ref()?;
84 database
85 .get_block_consistency_status(&self.chain)
86 .await
87 .map_err(|e| tracing::error!("Error getting block consistency status: {e}"))
88 .ok()
89 }
90
91 #[must_use]
93 pub fn min_dex_creation_block(&self) -> Option<u64> {
94 self.dexes
95 .values()
96 .map(|dex| dex.factory_creation_block)
97 .min()
98 }
99
100 #[must_use]
102 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
103 self.block_timestamps.get(&block_number)
104 }
105
106 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
108 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
109 self.database = Some(database);
110 }
111
112 pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
118 if let Some(database) = &self.database {
119 database.toggle_perf_sync_settings(enable).await
120 } else {
121 tracing::warn!("Database not initialized, skipping performance settings toggle");
122 Ok(())
123 }
124 }
125
126 pub async fn initialize_chain(&mut self) {
131 if let Some(database) = &self.database {
133 if let Err(e) = database.seed_chain(&self.chain).await {
134 tracing::error!(
135 "Error seeding chain in database: {e}. Continuing without database cache functionality"
136 );
137 return;
138 }
139 tracing::info!("Chain seeded in the database");
140
141 match database.create_block_partition(&self.chain).await {
142 Ok(message) => tracing::info!("Executing block partition creation: {}", message),
143 Err(e) => tracing::error!(
144 "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
145 self.chain.chain_id
146 ),
147 }
148
149 match database.create_token_partition(&self.chain).await {
150 Ok(message) => tracing::info!("Executing token partition creation: {}", message),
151 Err(e) => tracing::error!(
152 "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
153 self.chain.chain_id
154 ),
155 }
156 }
157
158 if let Err(e) = self.load_tokens().await {
159 tracing::error!("Error loading tokens from the database: {e}");
160 }
161 }
162
163 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
169 tracing::debug!("Connecting and loading from_block {from_block}");
170
171 if let Err(e) = self.load_tokens().await {
172 tracing::error!("Error loading tokens from the database: {e}");
173 }
174
175 Ok(())
181 }
182
183 async fn load_tokens(&mut self) -> anyhow::Result<()> {
185 if let Some(database) = &self.database {
186 let (tokens, invalid_tokens) = tokio::try_join!(
187 database.load_tokens(self.chain.clone()),
188 database.load_invalid_token_addresses(self.chain.chain_id)
189 )?;
190
191 tracing::info!(
192 "Loading {} valid tokens and {} invalid tokens from cache database",
193 tokens.len(),
194 invalid_tokens.len()
195 );
196
197 self.tokens
198 .extend(tokens.into_iter().map(|token| (token.address, token)));
199 self.invalid_tokens.extend(invalid_tokens);
200 }
201 Ok(())
202 }
203
204 pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<()> {
210 if let Some(database) = &self.database {
211 let dex = self
212 .get_dex(dex_id)
213 .ok_or_else(|| anyhow::anyhow!("DEX {:?} has not been registered", dex_id))?;
214 let pool_rows = database
215 .load_pools(self.chain.clone(), &dex_id.to_string())
216 .await?;
217 tracing::info!(
218 "Loading {} pools for DEX {} from cache database",
219 pool_rows.len(),
220 dex_id,
221 );
222
223 for pool_row in pool_rows {
224 let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
225 token
226 } else {
227 tracing::error!(
228 "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
229 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
230 pool_row.address,
231 dex_id,
232 pool_row.token0_address
233 );
234 continue;
235 };
236
237 let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
238 token
239 } else {
240 tracing::error!(
241 "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
242 This may indicate the token was not properly loaded from the database or the pool references an unknown token.",
243 pool_row.address,
244 dex_id,
245 pool_row.token1_address
246 );
247 continue;
248 };
249
250 let pool = Pool::new(
252 self.chain.clone(),
253 dex.clone(),
254 pool_row.address,
255 pool_row.creation_block as u64,
256 token0.clone(),
257 token1.clone(),
258 pool_row.fee.map(|fee| fee as u32),
259 pool_row
260 .tick_spacing
261 .map(|tick_spacing| tick_spacing as u32),
262 UnixNanos::default(), );
264
265 self.pools.insert(pool.address, Arc::new(pool));
267 }
268 }
269 Ok(())
270 }
271
272 #[allow(dead_code)] async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
276 if let Some(database) = &self.database {
277 let block_timestamps = database
278 .load_block_timestamps(self.chain.clone(), from_block)
279 .await?;
280
281 if !block_timestamps.is_empty() {
283 let first = block_timestamps.first().unwrap().number;
284 let last = block_timestamps.last().unwrap().number;
285 let expected_len = (last - first + 1) as usize;
286 if block_timestamps.len() != expected_len {
287 anyhow::bail!(
288 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
289 block_timestamps.len()
290 );
291 }
292 }
293
294 if block_timestamps.is_empty() {
295 tracing::info!("No blocks found in database");
296 return Ok(());
297 }
298
299 tracing::info!(
300 "Loading {} blocks timestamps from the cache database with last block number {}",
301 block_timestamps.len(),
302 block_timestamps.last().unwrap().number,
303 );
304 for block in block_timestamps {
305 self.block_timestamps.insert(block.number, block.timestamp);
306 }
307 }
308 Ok(())
309 }
310
311 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
317 if let Some(database) = &self.database {
318 database.add_block(self.chain.chain_id, &block).await?;
319 }
320 self.block_timestamps.insert(block.number, block.timestamp);
321 Ok(())
322 }
323
324 pub async fn add_blocks_batch(
330 &mut self,
331 blocks: Vec<Block>,
332 use_copy_command: bool,
333 ) -> anyhow::Result<()> {
334 if blocks.is_empty() {
335 return Ok(());
336 }
337
338 if let Some(database) = &self.database {
339 if use_copy_command {
340 database
341 .add_blocks_copy(self.chain.chain_id, &blocks)
342 .await?;
343 } else {
344 database
345 .add_blocks_batch(self.chain.chain_id, &blocks)
346 .await?;
347 }
348 }
349
350 for block in blocks {
352 self.block_timestamps.insert(block.number, block.timestamp);
353 }
354
355 Ok(())
356 }
357
358 pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
364 tracing::info!("Adding dex {} to the cache", dex.name);
365
366 if let Some(database) = &self.database {
367 database.add_dex(dex.clone()).await?;
368 }
369
370 self.dexes.insert(dex.name, dex);
371 Ok(())
372 }
373
374 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
380 let pool_address = pool.address;
381 if let Some(database) = &self.database {
382 database.add_pool(&pool).await?;
383 }
384
385 self.pools.insert(pool_address, Arc::new(pool));
386 Ok(())
387 }
388
389 pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
395 if pools.is_empty() {
396 return Ok(());
397 }
398
399 if let Some(database) = &self.database {
400 database.add_pools_batch(&pools).await?;
401 }
402
403 Ok(())
404 }
405
406 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
412 if let Some(database) = &self.database {
413 database.add_token(&token).await?;
414 }
415 self.tokens.insert(token.address, token);
416 Ok(())
417 }
418
419 pub async fn add_invalid_token(
425 &mut self,
426 address: Address,
427 error_string: &str,
428 ) -> anyhow::Result<()> {
429 if let Some(database) = &self.database {
430 database
431 .add_invalid_token(self.chain.chain_id, &address, error_string)
432 .await?;
433 }
434 self.invalid_tokens.insert(address);
435 Ok(())
436 }
437
438 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
444 if let Some(database) = &self.database {
445 database.add_swap(self.chain.chain_id, swap).await?;
446 }
447
448 Ok(())
449 }
450
451 pub async fn add_liquidity_update(
457 &self,
458 liquidity_update: &PoolLiquidityUpdate,
459 ) -> anyhow::Result<()> {
460 if let Some(database) = &self.database {
461 database
462 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
463 .await?;
464 }
465
466 Ok(())
467 }
468
469 pub async fn add_pool_swaps_batch(
475 &self,
476 swaps: &[PoolSwap],
477 use_copy_command: bool,
478 ) -> anyhow::Result<()> {
479 if let Some(database) = &self.database {
480 if use_copy_command {
481 database
482 .add_pool_swaps_copy(self.chain.chain_id, swaps)
483 .await?;
484 } else {
485 database
486 .add_pool_swaps_batch(self.chain.chain_id, swaps)
487 .await?;
488 }
489 }
490
491 Ok(())
492 }
493
494 pub async fn add_pool_liquidity_updates_batch(
500 &self,
501 updates: &[PoolLiquidityUpdate],
502 use_copy_command: bool,
503 ) -> anyhow::Result<()> {
504 if let Some(database) = &self.database {
505 if use_copy_command {
506 database
507 .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
508 .await?;
509 } else {
510 database
511 .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
512 .await?;
513 }
514 }
515
516 Ok(())
517 }
518
519 pub async fn add_pool_fee_collects_batch(
525 &self,
526 collects: &[PoolFeeCollect],
527 use_copy_command: bool,
528 ) -> anyhow::Result<()> {
529 if let Some(database) = &self.database {
530 if use_copy_command {
531 database
532 .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
533 .await?;
534 } else {
535 database
536 .add_pool_collects_batch(self.chain.chain_id, collects)
537 .await?;
538 }
539 }
540
541 Ok(())
542 }
543
544 pub async fn update_pool_initialize_price_tick(
545 &self,
546 initialize_event: &InitializeEvent,
547 ) -> anyhow::Result<()> {
548 if let Some(database) = &self.database {
549 database
550 .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
551 .await?;
552 }
553
554 Ok(())
555 }
556
557 #[must_use]
559 pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
560 self.dexes.get(dex_id).cloned()
561 }
562
563 #[must_use]
565 pub fn get_registered_dexes(&self) -> HashSet<DexType> {
566 self.dexes.keys().copied().collect()
567 }
568
569 #[must_use]
571 pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
572 self.pools.get(address)
573 }
574
575 #[must_use]
577 pub fn get_token(&self, address: &Address) -> Option<&Token> {
578 self.tokens.get(address)
579 }
580
581 #[must_use]
586 pub fn is_invalid_token(&self, address: &Address) -> bool {
587 self.invalid_tokens.contains(address)
588 }
589
590 pub async fn update_dex_last_synced_block(
596 &self,
597 dex: &DexType,
598 block_number: u64,
599 ) -> anyhow::Result<()> {
600 if let Some(database) = &self.database {
601 database
602 .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
603 .await
604 } else {
605 Ok(())
606 }
607 }
608
609 pub async fn update_pool_last_synced_block(
610 &self,
611 dex: &DexType,
612 pool_address: &Address,
613 block_number: u64,
614 ) -> anyhow::Result<()> {
615 if let Some(database) = &self.database {
616 database
617 .update_pool_last_synced_block(self.chain.chain_id, dex, pool_address, block_number)
618 .await
619 } else {
620 Ok(())
621 }
622 }
623
624 pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
630 if let Some(database) = &self.database {
631 database
632 .get_dex_last_synced_block(self.chain.chain_id, dex)
633 .await
634 } else {
635 Ok(None)
636 }
637 }
638
639 pub async fn get_pool_last_synced_block(
640 &self,
641 dex: &DexType,
642 pool_address: &Address,
643 ) -> anyhow::Result<Option<u64>> {
644 if let Some(database) = &self.database {
645 database
646 .get_pool_last_synced_block(self.chain.chain_id, dex, pool_address)
647 .await
648 } else {
649 Ok(None)
650 }
651 }
652
653 pub async fn get_pool_event_tables_last_block(
659 &self,
660 pool_address: &Address,
661 ) -> anyhow::Result<Option<u64>> {
662 if let Some(database) = &self.database {
663 let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
664 database.get_table_last_block(self.chain.chain_id, "pool_swap_event", pool_address),
665 database.get_table_last_block(
666 self.chain.chain_id,
667 "pool_liquidity_event",
668 pool_address
669 ),
670 database.get_table_last_block(
671 self.chain.chain_id,
672 "pool_collect_event",
673 pool_address
674 ),
675 )?;
676
677 let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
678 .into_iter()
679 .filter_map(|x| x)
680 .max();
681 Ok(max_block)
682 } else {
683 Ok(None)
684 }
685 }
686}