1use alloy::primitives::{Address, U256};
17use nautilus_model::defi::{
18 Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
19 data::PoolFeeCollect, validation::validate_address,
20};
21use sqlx::{PgPool, postgres::PgConnectOptions};
22
23use crate::{
24 cache::{
25 consistency::CachedBlocksConsistencyStatus,
26 copy::PostgresCopyHandler,
27 rows::{BlockTimestampRow, PoolRow, TokenRow},
28 },
29 events::initialize::InitializeEvent,
30};
31
32#[derive(Debug)]
34pub struct BlockchainCacheDatabase {
35 pool: PgPool,
37}
38
39impl BlockchainCacheDatabase {
40 pub async fn init(pg_options: PgConnectOptions) -> Self {
46 let pool = sqlx::postgres::PgPoolOptions::new()
47 .max_connections(32) .min_connections(5) .acquire_timeout(std::time::Duration::from_secs(3))
50 .connect_with(pg_options)
51 .await
52 .expect("Error connecting to Postgres");
53 Self { pool }
54 }
55
56 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
62 sqlx::query(
63 r"
64 INSERT INTO chain (
65 chain_id, name
66 ) VALUES ($1,$2)
67 ON CONFLICT (chain_id)
68 DO NOTHING
69 ",
70 )
71 .bind(chain.chain_id as i32)
72 .bind(chain.name.to_string())
73 .execute(&self.pool)
74 .await
75 .map(|_| ())
76 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
77 }
78
79 pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
86 let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
87 .bind(chain.chain_id as i32)
88 .fetch_one(&self.pool)
89 .await
90 .map_err(|e| {
91 anyhow::anyhow!(
92 "Failed to call create_block_partition for chain {}: {e}",
93 chain.chain_id
94 )
95 })?;
96
97 Ok(result.0)
98 }
99
100 pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
107 let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
108 .bind(chain.chain_id as i32)
109 .fetch_one(&self.pool)
110 .await
111 .map_err(|e| {
112 anyhow::anyhow!(
113 "Failed to call create_token_partition for chain {}: {e}",
114 chain.chain_id
115 )
116 })?;
117
118 Ok(result.0)
119 }
120
121 pub async fn get_block_consistency_status(
127 &self,
128 chain: &Chain,
129 ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
130 tracing::info!("Fetching block consistency status");
131
132 let result: (i64, i64) = sqlx::query_as(
133 r"
134 SELECT
135 COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
136 get_last_continuous_block($1) as last_continuous_block
137 "
138 )
139 .bind(chain.chain_id as i32)
140 .fetch_one(&self.pool)
141 .await
142 .map_err(|e| {
143 anyhow::anyhow!(
144 "Failed to get block info for chain {}: {}",
145 chain.chain_id,
146 e
147 )
148 })?;
149
150 Ok(CachedBlocksConsistencyStatus::new(
151 result.0 as u64,
152 result.1 as u64,
153 ))
154 }
155
156 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
162 sqlx::query(
163 r"
164 INSERT INTO block (
165 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
166 base_fee_per_gas, blob_gas_used, excess_blob_gas,
167 l1_gas_price, l1_gas_used, l1_fee_scalar
168 ) VALUES (
169 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
170 )
171 ON CONFLICT (chain_id, number)
172 DO UPDATE
173 SET
174 hash = $3,
175 parent_hash = $4,
176 miner = $5,
177 gas_limit = $6,
178 gas_used = $7,
179 timestamp = $8,
180 base_fee_per_gas = $9,
181 blob_gas_used = $10,
182 excess_blob_gas = $11,
183 l1_gas_price = $12,
184 l1_gas_used = $13,
185 l1_fee_scalar = $14
186 ",
187 )
188 .bind(chain_id as i32)
189 .bind(block.number as i64)
190 .bind(block.hash.as_str())
191 .bind(block.parent_hash.as_str())
192 .bind(block.miner.as_str())
193 .bind(block.gas_limit as i64)
194 .bind(block.gas_used as i64)
195 .bind(block.timestamp.to_string())
196 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
197 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
198 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
199 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
200 .bind(block.l1_gas_used.map(|v| v as i64))
201 .bind(block.l1_fee_scalar.map(|v| v as i64))
202 .execute(&self.pool)
203 .await
204 .map(|_| ())
205 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
206 }
207
208 pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
214 if blocks.is_empty() {
215 return Ok(());
216 }
217
218 let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
220 let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
221 let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
222 let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
223 let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
224 let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
225 let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
226 let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
227 let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
228 let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
229 let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
230 let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
231 let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
232
233 for block in blocks {
235 numbers.push(block.number as i64);
236 hashes.push(block.hash.clone());
237 parent_hashes.push(block.parent_hash.clone());
238 miners.push(block.miner.to_string());
239 gas_limits.push(block.gas_limit as i64);
240 gas_useds.push(block.gas_used as i64);
241 timestamps.push(block.timestamp.to_string());
242 base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
243 blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
244 excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
245 l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
246 l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
247 l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
248 }
249
250 sqlx::query(
252 r"
253 INSERT INTO block (
254 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
255 base_fee_per_gas, blob_gas_used, excess_blob_gas,
256 l1_gas_price, l1_gas_used, l1_fee_scalar
257 )
258 SELECT
259 $1, *
260 FROM UNNEST(
261 $2::int8[], $3::text[], $4::text[], $5::text[],
262 $6::int8[], $7::int8[], $8::text[],
263 $9::text[], $10::text[], $11::text[],
264 $12::text[], $13::int8[], $14::int8[]
265 )
266 ON CONFLICT (chain_id, number) DO NOTHING
267 ",
268 )
269 .bind(chain_id as i32)
270 .bind(&numbers[..])
271 .bind(&hashes[..])
272 .bind(&parent_hashes[..])
273 .bind(&miners[..])
274 .bind(&gas_limits[..])
275 .bind(&gas_useds[..])
276 .bind(×tamps[..])
277 .bind(&base_fee_per_gases as &[Option<String>])
278 .bind(&blob_gas_useds as &[Option<String>])
279 .bind(&excess_blob_gases as &[Option<String>])
280 .bind(&l1_gas_prices as &[Option<String>])
281 .bind(&l1_gas_useds as &[Option<i64>])
282 .bind(&l1_fee_scalars as &[Option<i64>])
283 .execute(&self.pool)
284 .await
285 .map(|_| ())
286 .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
287 }
288
289 pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
298 let copy_handler = PostgresCopyHandler::new(&self.pool);
299 copy_handler.copy_blocks(chain_id, blocks).await
300 }
301
302 pub async fn add_pool_swaps_copy(
311 &self,
312 chain_id: u32,
313 swaps: &[PoolSwap],
314 ) -> anyhow::Result<()> {
315 let copy_handler = PostgresCopyHandler::new(&self.pool);
316 copy_handler.copy_pool_swaps(chain_id, swaps).await
317 }
318
319 pub async fn add_pool_liquidity_updates_copy(
328 &self,
329 chain_id: u32,
330 updates: &[PoolLiquidityUpdate],
331 ) -> anyhow::Result<()> {
332 let copy_handler = PostgresCopyHandler::new(&self.pool);
333 copy_handler
334 .copy_pool_liquidity_updates(chain_id, updates)
335 .await
336 }
337
338 pub async fn copy_pool_fee_collects_batch(
347 &self,
348 chain_id: u32,
349 collects: &[PoolFeeCollect],
350 ) -> anyhow::Result<()> {
351 let copy_handler = PostgresCopyHandler::new(&self.pool);
352 copy_handler.copy_pool_collects(chain_id, collects).await
353 }
354
355 pub async fn load_block_timestamps(
361 &self,
362 chain: SharedChain,
363 from_block: u64,
364 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
365 sqlx::query_as::<_, BlockTimestampRow>(
366 r"
367 SELECT
368 number,
369 timestamp
370 FROM block
371 WHERE chain_id = $1 AND number >= $2
372 ORDER BY number ASC
373 ",
374 )
375 .bind(chain.chain_id as i32)
376 .bind(from_block as i64)
377 .fetch_all(&self.pool)
378 .await
379 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
380 }
381
382 pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
388 sqlx::query(
389 r"
390 INSERT INTO dex (
391 chain_id, name, factory_address, creation_block
392 ) VALUES ($1, $2, $3, $4)
393 ON CONFLICT (chain_id, name)
394 DO UPDATE
395 SET
396 factory_address = $3,
397 creation_block = $4
398 ",
399 )
400 .bind(dex.chain.chain_id as i32)
401 .bind(dex.name.to_string())
402 .bind(dex.factory.to_string())
403 .bind(dex.factory_creation_block as i64)
404 .execute(&self.pool)
405 .await
406 .map(|_| ())
407 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
408 }
409
410 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
416 sqlx::query(
417 r"
418 INSERT INTO pool (
419 chain_id, address, dex_name, creation_block,
420 token0_chain, token0_address,
421 token1_chain, token1_address,
422 fee, tick_spacing
423 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
424 ON CONFLICT (chain_id, address)
425 DO UPDATE
426 SET
427 dex_name = $3,
428 creation_block = $4,
429 token0_chain = $5,
430 token0_address = $6,
431 token1_chain = $7,
432 token1_address = $8,
433 fee = $9,
434 tick_spacing = $10
435 ",
436 )
437 .bind(pool.chain.chain_id as i32)
438 .bind(pool.address.to_string())
439 .bind(pool.dex.name.to_string())
440 .bind(pool.creation_block as i64)
441 .bind(pool.token0.chain.chain_id as i32)
442 .bind(pool.token0.address.to_string())
443 .bind(pool.token1.chain.chain_id as i32)
444 .bind(pool.token1.address.to_string())
445 .bind(pool.fee.map(|fee| fee as i32))
446 .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
447 .execute(&self.pool)
448 .await
449 .map(|_| ())
450 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
451 }
452
453 pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
459 if pools.is_empty() {
460 return Ok(());
461 }
462
463 let mut addresses: Vec<String> = Vec::with_capacity(pools.len());
465 let mut dex_names: Vec<String> = Vec::with_capacity(pools.len());
466 let mut creation_blocks: Vec<i64> = Vec::with_capacity(pools.len());
467 let mut token0_chains: Vec<i32> = Vec::with_capacity(pools.len());
468 let mut token0_addresses: Vec<String> = Vec::with_capacity(pools.len());
469 let mut token1_chains: Vec<i32> = Vec::with_capacity(pools.len());
470 let mut token1_addresses: Vec<String> = Vec::with_capacity(pools.len());
471 let mut fees: Vec<Option<i32>> = Vec::with_capacity(pools.len());
472 let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(pools.len());
473 let mut chain_ids: Vec<i32> = Vec::with_capacity(pools.len());
474
475 for pool in pools {
477 chain_ids.push(pool.chain.chain_id as i32);
478 addresses.push(pool.address.to_string());
479 dex_names.push(pool.dex.name.to_string());
480 creation_blocks.push(pool.creation_block as i64);
481 token0_chains.push(pool.token0.chain.chain_id as i32);
482 token0_addresses.push(pool.token0.address.to_string());
483 token1_chains.push(pool.token1.chain.chain_id as i32);
484 token1_addresses.push(pool.token1.address.to_string());
485 fees.push(pool.fee.map(|fee| fee as i32));
486 tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
487 }
488
489 sqlx::query(
491 r"
492 INSERT INTO pool (
493 chain_id, address, dex_name, creation_block,
494 token0_chain, token0_address,
495 token1_chain, token1_address,
496 fee, tick_spacing
497 )
498 SELECT *
499 FROM UNNEST(
500 $1::int4[], $2::text[], $3::text[], $4::int8[],
501 $5::int4[], $6::text[], $7::int4[], $8::text[],
502 $9::int4[], $10::int4[]
503 )
504 ON CONFLICT (chain_id, address) DO NOTHING
505 ",
506 )
507 .bind(&chain_ids[..])
508 .bind(&addresses[..])
509 .bind(&dex_names[..])
510 .bind(&creation_blocks[..])
511 .bind(&token0_chains[..])
512 .bind(&token0_addresses[..])
513 .bind(&token1_chains[..])
514 .bind(&token1_addresses[..])
515 .bind(&fees[..])
516 .bind(&tick_spacings[..])
517 .execute(&self.pool)
518 .await
519 .map(|_| ())
520 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
521 }
522
523 pub async fn add_pool_swaps_batch(
529 &self,
530 chain_id: u32,
531 swaps: &[PoolSwap],
532 ) -> anyhow::Result<()> {
533 if swaps.is_empty() {
534 return Ok(());
535 }
536
537 let mut pool_addresses: Vec<String> = Vec::with_capacity(swaps.len());
539 let mut blocks: Vec<i64> = Vec::with_capacity(swaps.len());
540 let mut transaction_hashes: Vec<String> = Vec::with_capacity(swaps.len());
541 let mut transaction_indices: Vec<i32> = Vec::with_capacity(swaps.len());
542 let mut log_indices: Vec<i32> = Vec::with_capacity(swaps.len());
543 let mut senders: Vec<String> = Vec::with_capacity(swaps.len());
544 let mut sides: Vec<String> = Vec::with_capacity(swaps.len());
545 let mut sizes: Vec<String> = Vec::with_capacity(swaps.len());
546 let mut prices: Vec<String> = Vec::with_capacity(swaps.len());
547 let mut chain_ids: Vec<i32> = Vec::with_capacity(swaps.len());
548
549 for swap in swaps {
551 chain_ids.push(chain_id as i32);
552 pool_addresses.push(swap.pool_address.to_string());
553 blocks.push(swap.block as i64);
554 transaction_hashes.push(swap.transaction_hash.clone());
555 transaction_indices.push(swap.transaction_index as i32);
556 log_indices.push(swap.log_index as i32);
557 senders.push(swap.sender.to_string());
558 sides.push(swap.side.to_string());
559 sizes.push(swap.size.to_string());
560 prices.push(swap.price.to_string());
561 }
562
563 sqlx::query(
565 r"
566 INSERT INTO pool_swap_event (
567 chain_id, pool_address, block, transaction_hash, transaction_index,
568 log_index, sender, side, size, price
569 )
570 SELECT *
571 FROM UNNEST(
572 $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
573 $6::int4[], $7::text[], $8::text[], $9::text[], $10::text[]
574 )
575 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
576 ",
577 )
578 .bind(&chain_ids[..])
579 .bind(&pool_addresses[..])
580 .bind(&blocks[..])
581 .bind(&transaction_hashes[..])
582 .bind(&transaction_indices[..])
583 .bind(&log_indices[..])
584 .bind(&senders[..])
585 .bind(&sides[..])
586 .bind(&sizes[..])
587 .bind(&prices[..])
588 .execute(&self.pool)
589 .await
590 .map(|_| ())
591 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
592 }
593
594 pub async fn add_pool_liquidity_updates_batch(
600 &self,
601 chain_id: u32,
602 updates: &[PoolLiquidityUpdate],
603 ) -> anyhow::Result<()> {
604 if updates.is_empty() {
605 return Ok(());
606 }
607
608 let mut pool_addresses: Vec<String> = Vec::with_capacity(updates.len());
610 let mut blocks: Vec<i64> = Vec::with_capacity(updates.len());
611 let mut transaction_hashes: Vec<String> = Vec::with_capacity(updates.len());
612 let mut transaction_indices: Vec<i32> = Vec::with_capacity(updates.len());
613 let mut log_indices: Vec<i32> = Vec::with_capacity(updates.len());
614 let mut event_types: Vec<String> = Vec::with_capacity(updates.len());
615 let mut senders: Vec<Option<String>> = Vec::with_capacity(updates.len());
616 let mut owners: Vec<String> = Vec::with_capacity(updates.len());
617 let mut position_liquidities: Vec<String> = Vec::with_capacity(updates.len());
618 let mut amount0s: Vec<String> = Vec::with_capacity(updates.len());
619 let mut amount1s: Vec<String> = Vec::with_capacity(updates.len());
620 let mut tick_lowers: Vec<i32> = Vec::with_capacity(updates.len());
621 let mut tick_uppers: Vec<i32> = Vec::with_capacity(updates.len());
622 let mut chain_ids: Vec<i32> = Vec::with_capacity(updates.len());
623
624 for update in updates {
626 chain_ids.push(chain_id as i32);
627 pool_addresses.push(update.pool_address.to_string());
628 blocks.push(update.block as i64);
629 transaction_hashes.push(update.transaction_hash.clone());
630 transaction_indices.push(update.transaction_index as i32);
631 log_indices.push(update.log_index as i32);
632 event_types.push(update.kind.to_string());
633 senders.push(update.sender.map(|s| s.to_string()));
634 owners.push(update.owner.to_string());
635 position_liquidities.push(update.position_liquidity.to_string());
636 amount0s.push(update.amount0.to_string());
637 amount1s.push(update.amount1.to_string());
638 tick_lowers.push(update.tick_lower);
639 tick_uppers.push(update.tick_upper);
640 }
641
642 sqlx::query(
644 r"
645 INSERT INTO pool_liquidity_event (
646 chain_id, pool_address, block, transaction_hash, transaction_index,
647 log_index, event_type, sender, owner, position_liquidity,
648 amount0, amount1, tick_lower, tick_upper
649 )
650 SELECT *
651 FROM UNNEST(
652 $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
653 $6::int4[], $7::text[], $8::text[], $9::text[], $10::text[],
654 $11::text[], $12::text[], $13::int4[], $14::int4[]
655 )
656 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
657 ",
658 )
659 .bind(&chain_ids[..])
660 .bind(&pool_addresses[..])
661 .bind(&blocks[..])
662 .bind(&transaction_hashes[..])
663 .bind(&transaction_indices[..])
664 .bind(&log_indices[..])
665 .bind(&event_types[..])
666 .bind(&senders[..])
667 .bind(&owners[..])
668 .bind(&position_liquidities[..])
669 .bind(&amount0s[..])
670 .bind(&amount1s[..])
671 .bind(&tick_lowers[..])
672 .bind(&tick_uppers[..])
673 .execute(&self.pool)
674 .await
675 .map(|_| ())
676 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
677 }
678
679 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
685 sqlx::query(
686 r"
687 INSERT INTO token (
688 chain_id, address, name, symbol, decimals
689 ) VALUES ($1, $2, $3, $4, $5)
690 ON CONFLICT (chain_id, address)
691 DO UPDATE
692 SET
693 name = $3,
694 symbol = $4,
695 decimals = $5
696 ",
697 )
698 .bind(token.chain.chain_id as i32)
699 .bind(token.address.to_string())
700 .bind(token.name.as_str())
701 .bind(token.symbol.as_str())
702 .bind(i32::from(token.decimals))
703 .execute(&self.pool)
704 .await
705 .map(|_| ())
706 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
707 }
708
709 pub async fn add_invalid_token(
715 &self,
716 chain_id: u32,
717 address: &Address,
718 error_string: &str,
719 ) -> anyhow::Result<()> {
720 sqlx::query(
721 r"
722 INSERT INTO token (
723 chain_id, address, error
724 ) VALUES ($1, $2, $3)
725 ON CONFLICT (chain_id, address)
726 DO NOTHING;
727 ",
728 )
729 .bind(chain_id as i32)
730 .bind(address.to_string())
731 .bind(error_string)
732 .execute(&self.pool)
733 .await
734 .map(|_| ())
735 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
736 }
737
738 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
744 sqlx::query(
745 r"
746 INSERT INTO pool_swap_event (
747 chain_id, pool_address, block, transaction_hash, transaction_index,
748 log_index, sender, side, size, price
749 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
750 ON CONFLICT (chain_id, transaction_hash, log_index)
751 DO NOTHING
752 ",
753 )
754 .bind(chain_id as i32)
755 .bind(swap.pool_address.to_string())
756 .bind(swap.block as i64)
757 .bind(swap.transaction_hash.as_str())
758 .bind(swap.transaction_index as i32)
759 .bind(swap.log_index as i32)
760 .bind(swap.sender.to_string())
761 .bind(swap.side.to_string())
762 .bind(swap.size.to_string())
763 .bind(swap.price.to_string())
764 .execute(&self.pool)
765 .await
766 .map(|_| ())
767 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
768 }
769
770 pub async fn add_pool_liquidity_update(
776 &self,
777 chain_id: u32,
778 liquidity_update: &PoolLiquidityUpdate,
779 ) -> anyhow::Result<()> {
780 sqlx::query(
781 r"
782 INSERT INTO pool_liquidity_event (
783 chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
784 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
785 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
786 ON CONFLICT (chain_id, transaction_hash, log_index)
787 DO NOTHING
788 ",
789 )
790 .bind(chain_id as i32)
791 .bind(liquidity_update.pool_address.to_string())
792 .bind(liquidity_update.block as i64)
793 .bind(liquidity_update.transaction_hash.as_str())
794 .bind(liquidity_update.transaction_index as i32)
795 .bind(liquidity_update.log_index as i32)
796 .bind(liquidity_update.kind.to_string())
797 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
798 .bind(liquidity_update.owner.to_string())
799 .bind(liquidity_update.position_liquidity.to_string())
800 .bind(liquidity_update.amount0.to_string())
801 .bind(liquidity_update.amount1.to_string())
802 .bind(liquidity_update.tick_lower)
803 .bind(liquidity_update.tick_upper)
804 .execute(&self.pool)
805 .await
806 .map(|_| ())
807 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
808 }
809
810 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
819 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
820 .bind(chain.chain_id as i32)
821 .fetch_all(&self.pool)
822 .await
823 .map(|rows| {
824 rows.into_iter()
825 .map(|token_row| {
826 Token::new(
827 chain.clone(),
828 token_row.address,
829 token_row.name,
830 token_row.symbol,
831 token_row.decimals as u8,
832 )
833 })
834 .collect::<Vec<_>>()
835 })
836 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
837 }
838
839 pub async fn load_invalid_token_addresses(
845 &self,
846 chain_id: u32,
847 ) -> anyhow::Result<Vec<Address>> {
848 sqlx::query_as::<_, (String,)>(
849 "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
850 )
851 .bind(chain_id as i32)
852 .fetch_all(&self.pool)
853 .await?
854 .into_iter()
855 .map(|(address,)| validate_address(&address))
856 .collect::<Result<Vec<_>, _>>()
857 .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
858 }
859
860 pub async fn load_pools(
866 &self,
867 chain: SharedChain,
868 dex_id: &str,
869 ) -> anyhow::Result<Vec<PoolRow>> {
870 sqlx::query_as::<_, PoolRow>(
871 r"
872 SELECT
873 address,
874 dex_name,
875 creation_block,
876 token0_chain,
877 token0_address,
878 token1_chain,
879 token1_address,
880 fee,
881 tick_spacing
882 FROM pool
883 WHERE chain_id = $1 AND dex_name = $2
884 ORDER BY creation_block ASC
885 ",
886 )
887 .bind(chain.chain_id as i32)
888 .bind(dex_id)
889 .fetch_all(&self.pool)
890 .await
891 .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
892 }
893
894 pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
908 if enable {
909 tracing::info!("Enabling performance sync settings for bulk operations");
910
911 sqlx::query("SET synchronous_commit = OFF")
913 .execute(&self.pool)
914 .await
915 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
916
917 sqlx::query("SET work_mem = '256MB'")
919 .execute(&self.pool)
920 .await
921 .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
922
923 tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
924 } else {
925 tracing::info!("Restoring default safe database performance settings");
926
927 sqlx::query("SET synchronous_commit = ON")
929 .execute(&self.pool)
930 .await
931 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
932
933 sqlx::query("RESET work_mem")
935 .execute(&self.pool)
936 .await
937 .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
938 }
939
940 Ok(())
941 }
942
943 pub async fn update_dex_last_synced_block(
949 &self,
950 chain_id: u32,
951 dex: &DexType,
952 block_number: u64,
953 ) -> anyhow::Result<()> {
954 sqlx::query(
955 r"
956 UPDATE dex
957 SET last_full_sync_pools_block_number = $3
958 WHERE chain_id = $1 AND name = $2
959 ",
960 )
961 .bind(chain_id as i32)
962 .bind(dex.to_string())
963 .bind(block_number as i64)
964 .execute(&self.pool)
965 .await
966 .map(|_| ())
967 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
968 }
969
970 pub async fn update_pool_last_synced_block(
971 &self,
972 chain_id: u32,
973 dex: &DexType,
974 pool_address: &Address,
975 block_number: u64,
976 ) -> anyhow::Result<()> {
977 sqlx::query(
978 r"
979 UPDATE pool
980 SET last_full_sync_block_number = $4
981 WHERE chain_id = $1
982 AND dex_name = $2
983 AND address = $3
984 ",
985 )
986 .bind(chain_id as i32)
987 .bind(dex.to_string())
988 .bind(pool_address.to_string())
989 .bind(block_number as i64)
990 .execute(&self.pool)
991 .await
992 .map(|_| ())
993 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
994 }
995
996 pub async fn get_dex_last_synced_block(
1002 &self,
1003 chain_id: u32,
1004 dex: &DexType,
1005 ) -> anyhow::Result<Option<u64>> {
1006 let result = sqlx::query_as::<_, (Option<i64>,)>(
1007 r#"
1008 SELECT
1009 last_full_sync_pools_block_number
1010 FROM dex
1011 WHERE chain_id = $1
1012 AND name = $2
1013 "#,
1014 )
1015 .bind(chain_id as i32)
1016 .bind(dex.to_string())
1017 .fetch_optional(&self.pool)
1018 .await
1019 .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1020
1021 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1022 }
1023
1024 pub async fn get_pool_last_synced_block(
1025 &self,
1026 chain_id: u32,
1027 dex: &DexType,
1028 pool_address: &Address,
1029 ) -> anyhow::Result<Option<u64>> {
1030 let result = sqlx::query_as::<_, (Option<i64>,)>(
1031 r#"
1032 SELECT
1033 last_full_sync_block_number
1034 FROM pool
1035 WHERE chain_id = $1
1036 AND dex_name = $2
1037 AND address = $3
1038 "#,
1039 )
1040 .bind(chain_id as i32)
1041 .bind(dex.to_string())
1042 .bind(pool_address.to_string())
1043 .fetch_optional(&self.pool)
1044 .await
1045 .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1046
1047 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1048 }
1049
1050 pub async fn get_table_last_block(
1057 &self,
1058 chain_id: u32,
1059 table_name: &str,
1060 pool_address: &Address,
1061 ) -> anyhow::Result<Option<u64>> {
1062 let query = format!(
1063 "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1064 table_name
1065 );
1066 let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1067 .bind(chain_id as i32)
1068 .bind(pool_address.to_string())
1069 .fetch_optional(&self.pool)
1070 .await
1071 .map_err(|e| {
1072 anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1073 })?;
1074
1075 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1076 }
1077
1078 pub async fn add_pool_collects_batch(
1084 &self,
1085 chain_id: u32,
1086 collects: &[PoolFeeCollect],
1087 ) -> anyhow::Result<()> {
1088 if collects.is_empty() {
1089 return Ok(());
1090 }
1091
1092 let mut pool_addresses: Vec<String> = Vec::with_capacity(collects.len());
1094 let mut blocks: Vec<i64> = Vec::with_capacity(collects.len());
1095 let mut transaction_hashes: Vec<String> = Vec::with_capacity(collects.len());
1096 let mut transaction_indices: Vec<i32> = Vec::with_capacity(collects.len());
1097 let mut log_indices: Vec<i32> = Vec::with_capacity(collects.len());
1098 let mut owners: Vec<String> = Vec::with_capacity(collects.len());
1099 let mut fee0s: Vec<String> = Vec::with_capacity(collects.len());
1100 let mut fee1s: Vec<String> = Vec::with_capacity(collects.len());
1101 let mut tick_lowers: Vec<i32> = Vec::with_capacity(collects.len());
1102 let mut tick_uppers: Vec<i32> = Vec::with_capacity(collects.len());
1103 let mut chain_ids: Vec<i32> = Vec::with_capacity(collects.len());
1104
1105 for collect in collects {
1107 chain_ids.push(chain_id as i32);
1108 pool_addresses.push(collect.pool_address.to_string());
1109 blocks.push(collect.block as i64);
1110 transaction_hashes.push(collect.transaction_hash.clone());
1111 transaction_indices.push(collect.transaction_index as i32);
1112 log_indices.push(collect.log_index as i32);
1113 owners.push(collect.owner.to_string());
1114 fee0s.push(collect.fee0.to_string());
1115 fee1s.push(collect.fee1.to_string());
1116 tick_lowers.push(collect.tick_lower);
1117 tick_uppers.push(collect.tick_upper);
1118 }
1119
1120 sqlx::query(
1122 r"
1123 INSERT INTO pool_collect_event (
1124 chain_id, pool_address, block, transaction_hash, transaction_index,
1125 log_index, owner, fee0, fee1, tick_lower, tick_upper
1126 )
1127 SELECT *
1128 FROM UNNEST(
1129 $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
1130 $6::int4[], $7::text[], $8::text[], $9::text[], $10::int4[], $11::int4[]
1131 )
1132 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1133 ",
1134 )
1135 .bind(&chain_ids[..])
1136 .bind(&pool_addresses[..])
1137 .bind(&blocks[..])
1138 .bind(&transaction_hashes[..])
1139 .bind(&transaction_indices[..])
1140 .bind(&log_indices[..])
1141 .bind(&owners[..])
1142 .bind(&fee0s[..])
1143 .bind(&fee1s[..])
1144 .bind(&tick_lowers[..])
1145 .bind(&tick_uppers[..])
1146 .execute(&self.pool)
1147 .await
1148 .map(|_| ())
1149 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1150 }
1151
1152 pub async fn update_pool_initial_price_tick(
1153 &self,
1154 chain_id: u32,
1155 initialize_event: &InitializeEvent,
1156 ) -> anyhow::Result<()> {
1157 sqlx::query(
1158 r"
1159 UPDATE pool
1160 SET
1161 initial_tick = $4,
1162 initial_sqrt_price_x96 = $5
1163 WHERE chain_id = $1
1164 AND dex_name = $2
1165 AND address = $3
1166 ",
1167 )
1168 .bind(chain_id as i32)
1169 .bind(initialize_event.dex.name.to_string())
1170 .bind(initialize_event.pool_address.to_string())
1171 .bind(initialize_event.tick)
1172 .bind(initialize_event.sqrt_price_x96.to_string())
1173 .execute(&self.pool)
1174 .await
1175 .map(|_| ())
1176 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1177 }
1178}