1use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21 defi::{
22 Block, Chain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain,
23 SharedDex, Token,
24 data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25 pool_analysis::{
26 position::PoolPosition,
27 snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
28 },
29 tick_map::tick::PoolTick,
30 validation::validate_address,
31 },
32 identifiers::InstrumentId,
33};
34use rust_decimal::Decimal;
35use sqlx::{PgPool, Row, postgres::PgConnectOptions};
36
37use crate::{
38 cache::{
39 consistency::CachedBlocksConsistencyStatus,
40 copy::PostgresCopyHandler,
41 rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
42 types::{U128Pg, U256Pg},
43 },
44 events::initialize::InitializeEvent,
45};
46
47#[derive(Debug)]
49pub struct BlockchainCacheDatabase {
50 pool: PgPool,
52}
53
54impl BlockchainCacheDatabase {
55 pub async fn init(pg_options: PgConnectOptions) -> Self {
61 let pool = sqlx::postgres::PgPoolOptions::new()
62 .max_connections(32) .min_connections(5) .acquire_timeout(std::time::Duration::from_secs(3))
65 .connect_with(pg_options)
66 .await
67 .expect("Error connecting to Postgres");
68 Self { pool }
69 }
70
71 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
77 sqlx::query(
78 r"
79 INSERT INTO chain (
80 chain_id, name
81 ) VALUES ($1,$2)
82 ON CONFLICT (chain_id)
83 DO NOTHING
84 ",
85 )
86 .bind(chain.chain_id as i32)
87 .bind(chain.name.to_string())
88 .execute(&self.pool)
89 .await
90 .map(|_| ())
91 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
92 }
93
94 pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
101 let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
102 .bind(chain.chain_id as i32)
103 .fetch_one(&self.pool)
104 .await
105 .map_err(|e| {
106 anyhow::anyhow!(
107 "Failed to call create_block_partition for chain {}: {e}",
108 chain.chain_id
109 )
110 })?;
111
112 Ok(result.0)
113 }
114
115 pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
122 let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
123 .bind(chain.chain_id as i32)
124 .fetch_one(&self.pool)
125 .await
126 .map_err(|e| {
127 anyhow::anyhow!(
128 "Failed to call create_token_partition for chain {}: {e}",
129 chain.chain_id
130 )
131 })?;
132
133 Ok(result.0)
134 }
135
136 pub async fn get_block_consistency_status(
142 &self,
143 chain: &Chain,
144 ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
145 tracing::info!("Fetching block consistency status");
146
147 let result: (i64, i64) = sqlx::query_as(
148 r"
149 SELECT
150 COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
151 get_last_continuous_block($1) as last_continuous_block
152 "
153 )
154 .bind(chain.chain_id as i32)
155 .fetch_one(&self.pool)
156 .await
157 .map_err(|e| {
158 anyhow::anyhow!(
159 "Failed to get block info for chain {}: {}",
160 chain.chain_id,
161 e
162 )
163 })?;
164
165 Ok(CachedBlocksConsistencyStatus::new(
166 result.0 as u64,
167 result.1 as u64,
168 ))
169 }
170
171 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
177 sqlx::query(
178 r"
179 INSERT INTO block (
180 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
181 base_fee_per_gas, blob_gas_used, excess_blob_gas,
182 l1_gas_price, l1_gas_used, l1_fee_scalar
183 ) VALUES (
184 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
185 )
186 ON CONFLICT (chain_id, number)
187 DO UPDATE
188 SET
189 hash = $3,
190 parent_hash = $4,
191 miner = $5,
192 gas_limit = $6,
193 gas_used = $7,
194 timestamp = $8,
195 base_fee_per_gas = $9,
196 blob_gas_used = $10,
197 excess_blob_gas = $11,
198 l1_gas_price = $12,
199 l1_gas_used = $13,
200 l1_fee_scalar = $14
201 ",
202 )
203 .bind(chain_id as i32)
204 .bind(block.number as i64)
205 .bind(block.hash.as_str())
206 .bind(block.parent_hash.as_str())
207 .bind(block.miner.as_str())
208 .bind(block.gas_limit as i64)
209 .bind(block.gas_used as i64)
210 .bind(block.timestamp.to_string())
211 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
212 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
213 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
214 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
215 .bind(block.l1_gas_used.map(|v| v as i64))
216 .bind(block.l1_fee_scalar.map(|v| v as i64))
217 .execute(&self.pool)
218 .await
219 .map(|_| ())
220 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
221 }
222
223 pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
229 if blocks.is_empty() {
230 return Ok(());
231 }
232
233 let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
235 let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
236 let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
237 let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
238 let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
239 let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
240 let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
241 let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242 let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243 let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
244 let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
245 let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
246 let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
247
248 for block in blocks {
250 numbers.push(block.number as i64);
251 hashes.push(block.hash.clone());
252 parent_hashes.push(block.parent_hash.clone());
253 miners.push(block.miner.to_string());
254 gas_limits.push(block.gas_limit as i64);
255 gas_useds.push(block.gas_used as i64);
256 timestamps.push(block.timestamp.to_string());
257 base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
258 blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
259 excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
260 l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
261 l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
262 l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
263 }
264
265 sqlx::query(
267 r"
268 INSERT INTO block (
269 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
270 base_fee_per_gas, blob_gas_used, excess_blob_gas,
271 l1_gas_price, l1_gas_used, l1_fee_scalar
272 )
273 SELECT
274 $1, *
275 FROM UNNEST(
276 $2::int8[], $3::text[], $4::text[], $5::text[],
277 $6::int8[], $7::int8[], $8::text[],
278 $9::text[], $10::text[], $11::text[],
279 $12::text[], $13::int8[], $14::int8[]
280 )
281 ON CONFLICT (chain_id, number) DO NOTHING
282 ",
283 )
284 .bind(chain_id as i32)
285 .bind(&numbers[..])
286 .bind(&hashes[..])
287 .bind(&parent_hashes[..])
288 .bind(&miners[..])
289 .bind(&gas_limits[..])
290 .bind(&gas_useds[..])
291 .bind(×tamps[..])
292 .bind(&base_fee_per_gases as &[Option<String>])
293 .bind(&blob_gas_useds as &[Option<String>])
294 .bind(&excess_blob_gases as &[Option<String>])
295 .bind(&l1_gas_prices as &[Option<String>])
296 .bind(&l1_gas_useds as &[Option<i64>])
297 .bind(&l1_fee_scalars as &[Option<i64>])
298 .execute(&self.pool)
299 .await
300 .map(|_| ())
301 .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
302 }
303
304 pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
313 let copy_handler = PostgresCopyHandler::new(&self.pool);
314 copy_handler.copy_blocks(chain_id, blocks).await
315 }
316
317 pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
323 let copy_handler = PostgresCopyHandler::new(&self.pool);
324 copy_handler.copy_tokens(chain_id, tokens).await
325 }
326
327 pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
333 let copy_handler = PostgresCopyHandler::new(&self.pool);
334 copy_handler.copy_pools(chain_id, pools).await
335 }
336
337 pub async fn add_pool_swaps_copy(
346 &self,
347 chain_id: u32,
348 swaps: &[PoolSwap],
349 ) -> anyhow::Result<()> {
350 let copy_handler = PostgresCopyHandler::new(&self.pool);
351 copy_handler.copy_pool_swaps(chain_id, swaps).await
352 }
353
354 pub async fn add_pool_liquidity_updates_copy(
363 &self,
364 chain_id: u32,
365 updates: &[PoolLiquidityUpdate],
366 ) -> anyhow::Result<()> {
367 let copy_handler = PostgresCopyHandler::new(&self.pool);
368 copy_handler
369 .copy_pool_liquidity_updates(chain_id, updates)
370 .await
371 }
372
373 pub async fn copy_pool_fee_collects_batch(
382 &self,
383 chain_id: u32,
384 collects: &[PoolFeeCollect],
385 ) -> anyhow::Result<()> {
386 let copy_handler = PostgresCopyHandler::new(&self.pool);
387 copy_handler.copy_pool_collects(chain_id, collects).await
388 }
389
390 pub async fn load_block_timestamps(
396 &self,
397 chain: SharedChain,
398 from_block: u64,
399 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
400 sqlx::query_as::<_, BlockTimestampRow>(
401 r"
402 SELECT
403 number,
404 timestamp
405 FROM block
406 WHERE chain_id = $1 AND number >= $2
407 ORDER BY number ASC
408 ",
409 )
410 .bind(chain.chain_id as i32)
411 .bind(from_block as i64)
412 .fetch_all(&self.pool)
413 .await
414 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
415 }
416
417 pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
423 sqlx::query(
424 r"
425 INSERT INTO dex (
426 chain_id, name, factory_address, creation_block
427 ) VALUES ($1, $2, $3, $4)
428 ON CONFLICT (chain_id, name)
429 DO UPDATE
430 SET
431 factory_address = $3,
432 creation_block = $4
433 ",
434 )
435 .bind(dex.chain.chain_id as i32)
436 .bind(dex.name.to_string())
437 .bind(dex.factory.to_string())
438 .bind(dex.factory_creation_block as i64)
439 .execute(&self.pool)
440 .await
441 .map(|_| ())
442 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
443 }
444
445 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
451 sqlx::query(
452 r"
453 INSERT INTO pool (
454 chain_id, address, pool_identifier, dex_name, creation_block,
455 token0_chain, token0_address,
456 token1_chain, token1_address,
457 fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
458 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
459 ON CONFLICT (chain_id, dex_name, pool_identifier)
460 DO UPDATE
461 SET
462 address = $2,
463 creation_block = $5,
464 token0_chain = $6,
465 token0_address = $7,
466 token1_chain = $8,
467 token1_address = $9,
468 fee = $10,
469 tick_spacing = $11,
470 initial_tick = $12,
471 initial_sqrt_price_x96 = $13,
472 hook_address = $14
473 ",
474 )
475 .bind(pool.chain.chain_id as i32)
476 .bind(pool.address.to_string())
477 .bind(pool.pool_identifier.as_ref())
478 .bind(pool.dex.name.to_string())
479 .bind(pool.creation_block as i64)
480 .bind(pool.token0.chain.chain_id as i32)
481 .bind(pool.token0.address.to_string())
482 .bind(pool.token1.chain.chain_id as i32)
483 .bind(pool.token1.address.to_string())
484 .bind(pool.fee.map(|fee| fee as i32))
485 .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
486 .bind(pool.initial_tick)
487 .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
488 .bind(pool.hooks.as_ref().map(|h| h.to_string()))
489 .execute(&self.pool)
490 .await
491 .map(|_| ())
492 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
493 }
494
495 pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
501 if pools.is_empty() {
502 return Ok(());
503 }
504
505 let len = pools.len();
507 let mut addresses: Vec<String> = Vec::with_capacity(len);
508 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
509 let mut dex_names: Vec<String> = Vec::with_capacity(len);
510 let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
511 let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
512 let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
513 let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
514 let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
515 let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
516 let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
517 let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
518 let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
519 let mut hook_addresses: Vec<Option<String>> = Vec::with_capacity(len);
520 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
521
522 for pool in pools {
524 chain_ids.push(pool.chain.chain_id as i32);
525 addresses.push(pool.address.to_string());
526 pool_identifiers.push(pool.pool_identifier.to_string());
527 dex_names.push(pool.dex.name.to_string());
528 creation_blocks.push(pool.creation_block as i64);
529 token0_chains.push(pool.token0.chain.chain_id as i32);
530 token0_addresses.push(pool.token0.address.to_string());
531 token1_chains.push(pool.token1.chain.chain_id as i32);
532 token1_addresses.push(pool.token1.address.to_string());
533 fees.push(pool.fee.map(|fee| fee as i32));
534 tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
535 initial_ticks.push(pool.initial_tick);
536 initial_sqrt_price_x96s
537 .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
538 hook_addresses.push(pool.hooks.as_ref().map(|h| h.to_string()));
539 }
540
541 sqlx::query(
543 r"
544 INSERT INTO pool (
545 chain_id, address, pool_identifier, dex_name, creation_block,
546 token0_chain, token0_address,
547 token1_chain, token1_address,
548 fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
549 )
550 SELECT *
551 FROM UNNEST(
552 $1::int4[], $2::text[], $3::text[], $4::text[], $5::int8[],
553 $6::int4[], $7::text[], $8::int4[], $9::text[],
554 $10::int4[], $11::int4[], $12::int4[], $13::text[], $14::text[]
555 )
556 ON CONFLICT (chain_id, dex_name, pool_identifier) DO NOTHING
557 ",
558 )
559 .bind(&chain_ids[..])
560 .bind(&addresses[..])
561 .bind(&pool_identifiers[..])
562 .bind(&dex_names[..])
563 .bind(&creation_blocks[..])
564 .bind(&token0_chains[..])
565 .bind(&token0_addresses[..])
566 .bind(&token1_chains[..])
567 .bind(&token1_addresses[..])
568 .bind(&fees[..])
569 .bind(&tick_spacings[..])
570 .bind(&initial_ticks[..])
571 .bind(&initial_sqrt_price_x96s[..])
572 .bind(&hook_addresses as &[Option<String>])
573 .execute(&self.pool)
574 .await
575 .map(|_| ())
576 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
577 }
578
579 pub async fn add_pool_swaps_batch(
585 &self,
586 chain_id: u32,
587 swaps: &[PoolSwap],
588 ) -> anyhow::Result<()> {
589 if swaps.is_empty() {
590 return Ok(());
591 }
592
593 let len = swaps.len();
595 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
596 let mut dex_names: Vec<String> = Vec::with_capacity(len);
597 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
598 let mut blocks: Vec<i64> = Vec::with_capacity(len);
599 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
600 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
601 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
602 let mut senders: Vec<String> = Vec::with_capacity(len);
603 let mut recipients: Vec<String> = Vec::with_capacity(len);
604 let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
605 let mut liquidities: Vec<String> = Vec::with_capacity(len);
606 let mut ticks: Vec<i32> = Vec::with_capacity(len);
607 let mut amount0s: Vec<String> = Vec::with_capacity(len);
608 let mut amount1s: Vec<String> = Vec::with_capacity(len);
609 let mut order_sides: Vec<Option<String>> = Vec::with_capacity(len);
610 let mut base_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
611 let mut quote_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
612 let mut spot_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
613 let mut execution_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
614
615 for swap in swaps {
617 chain_ids.push(chain_id as i32);
618 dex_names.push(swap.dex.name.to_string());
619 pool_identifiers.push(swap.pool_identifier.to_string());
620 blocks.push(swap.block as i64);
621 transaction_hashes.push(swap.transaction_hash.clone());
622 transaction_indices.push(swap.transaction_index as i32);
623 log_indices.push(swap.log_index as i32);
624 senders.push(swap.sender.to_string());
625 recipients.push(swap.recipient.to_string());
626 sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
627 liquidities.push(swap.liquidity.to_string());
628 ticks.push(swap.tick);
629 amount0s.push(swap.amount0.to_string());
630 amount1s.push(swap.amount1.to_string());
631
632 if let Some(ref trade_info) = swap.trade_info {
634 order_sides.push(Some(trade_info.order_side.to_string()));
635 base_quantities.push(Some(trade_info.quantity_base.as_decimal()));
636 quote_quantities.push(Some(trade_info.quantity_quote.as_decimal()));
637 spot_prices.push(Some(trade_info.spot_price.as_decimal()));
638 execution_prices.push(Some(trade_info.execution_price.as_decimal()));
639 } else {
640 order_sides.push(None);
641 base_quantities.push(None);
642 quote_quantities.push(None);
643 spot_prices.push(None);
644 execution_prices.push(None);
645 }
646 }
647
648 sqlx::query(
650 r"
651 INSERT INTO pool_swap_event (
652 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
653 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
654 order_side, base_quantity, quote_quantity, spot_price, execution_price
655 )
656 SELECT
657 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index, log_index, sender, recipient,
658 sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256,
659 order_side, base_quantity, quote_quantity, spot_price, execution_price
660 FROM UNNEST(
661 $1::INT[], $2::TEXT[], $3::TEXT[], $4::BIGINT[], $5::TEXT[], $6::INT[], $7::INT[],
662 $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::INT[], $13::TEXT[], $14::TEXT[],
663 $15::TEXT[], $16, $17, $18, $19
664 ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
665 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
666 order_side, base_quantity, quote_quantity, spot_price, execution_price)
667 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
668 ",
669 )
670 .bind(&chain_ids[..])
671 .bind(&dex_names[..])
672 .bind(&pool_identifiers[..])
673 .bind(&blocks[..])
674 .bind(&transaction_hashes[..])
675 .bind(&transaction_indices[..])
676 .bind(&log_indices[..])
677 .bind(&senders[..])
678 .bind(&recipients[..])
679 .bind(&sqrt_price_x96s[..])
680 .bind(&liquidities[..])
681 .bind(&ticks[..])
682 .bind(&amount0s[..])
683 .bind(&amount1s[..])
684 .bind(&order_sides[..])
685 .bind(&base_quantities[..])
686 .bind("e_quantities[..])
687 .bind(&spot_prices[..])
688 .bind(&execution_prices[..])
689 .execute(&self.pool)
690 .await
691 .map(|_| ())
692 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
693 }
694
695 pub async fn add_pool_liquidity_updates_batch(
701 &self,
702 chain_id: u32,
703 updates: &[PoolLiquidityUpdate],
704 ) -> anyhow::Result<()> {
705 if updates.is_empty() {
706 return Ok(());
707 }
708
709 let len = updates.len();
711 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
712 let mut dex_names: Vec<String> = Vec::with_capacity(len);
713 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
714 let mut blocks: Vec<i64> = Vec::with_capacity(len);
715 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
716 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
717 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
718 let mut event_types: Vec<String> = Vec::with_capacity(len);
719 let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
720 let mut owners: Vec<String> = Vec::with_capacity(len);
721 let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
722 let mut amount0s: Vec<String> = Vec::with_capacity(len);
723 let mut amount1s: Vec<String> = Vec::with_capacity(len);
724 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
725 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
726
727 for update in updates {
729 chain_ids.push(chain_id as i32);
730 dex_names.push(update.dex.name.to_string());
731 pool_identifiers.push(update.pool_identifier.to_string());
732 blocks.push(update.block as i64);
733 transaction_hashes.push(update.transaction_hash.clone());
734 transaction_indices.push(update.transaction_index as i32);
735 log_indices.push(update.log_index as i32);
736 event_types.push(update.kind.to_string());
737 senders.push(update.sender.map(|s| s.to_string()));
738 owners.push(update.owner.to_string());
739 position_liquidities.push(update.position_liquidity.to_string());
740 amount0s.push(update.amount0.to_string());
741 amount1s.push(update.amount1.to_string());
742 tick_lowers.push(update.tick_lower);
743 tick_uppers.push(update.tick_upper);
744 }
745
746 sqlx::query(
748 r"
749 INSERT INTO pool_liquidity_event (
750 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
751 log_index, event_type, sender, owner, position_liquidity,
752 amount0, amount1, tick_lower, tick_upper
753 )
754 SELECT
755 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
756 log_index, event_type, sender, owner, position_liquidity::u128,
757 amount0::U256, amount1::U256, tick_lower, tick_upper
758 FROM UNNEST(
759 $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
760 $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
761 $12::TEXT[], $13::TEXT[], $14::INT[], $15::INT[]
762 ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
763 log_index, event_type, sender, owner, position_liquidity,
764 amount0, amount1, tick_lower, tick_upper)
765 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
766 ",
767 )
768 .bind(&chain_ids[..])
769 .bind(&dex_names[..])
770 .bind(&pool_identifiers[..])
771 .bind(&blocks[..])
772 .bind(&transaction_hashes[..])
773 .bind(&transaction_indices[..])
774 .bind(&log_indices[..])
775 .bind(&event_types[..])
776 .bind(&senders[..])
777 .bind(&owners[..])
778 .bind(&position_liquidities[..])
779 .bind(&amount0s[..])
780 .bind(&amount1s[..])
781 .bind(&tick_lowers[..])
782 .bind(&tick_uppers[..])
783 .execute(&self.pool)
784 .await
785 .map(|_| ())
786 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
787 }
788
789 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
795 sqlx::query(
796 r"
797 INSERT INTO token (
798 chain_id, address, name, symbol, decimals
799 ) VALUES ($1, $2, $3, $4, $5)
800 ON CONFLICT (chain_id, address)
801 DO UPDATE
802 SET
803 name = $3,
804 symbol = $4,
805 decimals = $5
806 ",
807 )
808 .bind(token.chain.chain_id as i32)
809 .bind(token.address.to_string())
810 .bind(token.name.as_str())
811 .bind(token.symbol.as_str())
812 .bind(i32::from(token.decimals))
813 .execute(&self.pool)
814 .await
815 .map(|_| ())
816 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
817 }
818
819 pub async fn add_invalid_token(
825 &self,
826 chain_id: u32,
827 address: &Address,
828 error_string: &str,
829 ) -> anyhow::Result<()> {
830 sqlx::query(
831 r"
832 INSERT INTO token (
833 chain_id, address, error
834 ) VALUES ($1, $2, $3)
835 ON CONFLICT (chain_id, address)
836 DO NOTHING;
837 ",
838 )
839 .bind(chain_id as i32)
840 .bind(address.to_string())
841 .bind(error_string)
842 .execute(&self.pool)
843 .await
844 .map(|_| ())
845 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
846 }
847
848 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
854 let (order_side, base_quantity, quote_quantity, spot_price, execution_price) =
856 if let Some(ref trade_info) = swap.trade_info {
857 (
858 Some(trade_info.order_side.to_string()),
859 Some(trade_info.quantity_base.as_decimal()),
860 Some(trade_info.quantity_quote.as_decimal()),
861 Some(trade_info.spot_price.as_decimal()),
862 Some(trade_info.execution_price.as_decimal()),
863 )
864 } else {
865 (None, None, None, None, None)
866 };
867
868 sqlx::query(
869 r"
870 INSERT INTO pool_swap_event (
871 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
872 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
873 order_side, base_quantity, quote_quantity, spot_price, execution_price
874 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::U160, $11::U128, $12, $13::I256, $14::I256, $15, $16, $17, $18, $19)
875 ON CONFLICT (chain_id, transaction_hash, log_index)
876 DO NOTHING
877 ",
878 )
879 .bind(chain_id as i32)
880 .bind(swap.dex.name.to_string())
881 .bind(swap.pool_identifier.as_str())
882 .bind(swap.block as i64)
883 .bind(swap.transaction_hash.as_str())
884 .bind(swap.transaction_index as i32)
885 .bind(swap.log_index as i32)
886 .bind(swap.sender.to_string())
887 .bind(swap.recipient.to_string())
888 .bind(swap.sqrt_price_x96.to_string())
889 .bind(swap.liquidity.to_string())
890 .bind(swap.tick)
891 .bind(swap.amount0.to_string())
892 .bind(swap.amount1.to_string())
893 .bind(order_side)
894 .bind(base_quantity)
895 .bind(quote_quantity)
896 .bind(spot_price)
897 .bind(execution_price)
898 .execute(&self.pool)
899 .await
900 .map(|_| ())
901 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
902 }
903
904 pub async fn add_pool_liquidity_update(
910 &self,
911 chain_id: u32,
912 liquidity_update: &PoolLiquidityUpdate,
913 ) -> anyhow::Result<()> {
914 sqlx::query(
915 r"
916 INSERT INTO pool_liquidity_event (
917 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index, log_index,
918 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
919 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
920 ON CONFLICT (chain_id, transaction_hash, log_index)
921 DO NOTHING
922 ",
923 )
924 .bind(chain_id as i32)
925 .bind(liquidity_update.dex.name.to_string())
926 .bind(liquidity_update.pool_identifier.as_str())
927 .bind(liquidity_update.block as i64)
928 .bind(liquidity_update.transaction_hash.as_str())
929 .bind(liquidity_update.transaction_index as i32)
930 .bind(liquidity_update.log_index as i32)
931 .bind(liquidity_update.kind.to_string())
932 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
933 .bind(liquidity_update.owner.to_string())
934 .bind(U128Pg(liquidity_update.position_liquidity))
935 .bind(U256Pg(liquidity_update.amount0))
936 .bind(U256Pg(liquidity_update.amount1))
937 .bind(liquidity_update.tick_lower)
938 .bind(liquidity_update.tick_upper)
939 .execute(&self.pool)
940 .await
941 .map(|_| ())
942 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
943 }
944
945 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
954 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
955 .bind(chain.chain_id as i32)
956 .fetch_all(&self.pool)
957 .await
958 .map(|rows| {
959 rows.into_iter()
960 .map(|token_row| {
961 Token::new(
962 chain.clone(),
963 token_row.address,
964 token_row.name,
965 token_row.symbol,
966 token_row.decimals as u8,
967 )
968 })
969 .collect::<Vec<_>>()
970 })
971 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
972 }
973
974 pub async fn load_invalid_token_addresses(
980 &self,
981 chain_id: u32,
982 ) -> anyhow::Result<Vec<Address>> {
983 sqlx::query_as::<_, (String,)>(
984 "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
985 )
986 .bind(chain_id as i32)
987 .fetch_all(&self.pool)
988 .await?
989 .into_iter()
990 .map(|(address,)| validate_address(&address))
991 .collect::<Result<Vec<_>, _>>()
992 .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
993 }
994
995 pub async fn load_pools(
1001 &self,
1002 chain: SharedChain,
1003 dex_id: &str,
1004 ) -> anyhow::Result<Vec<PoolRow>> {
1005 sqlx::query_as::<_, PoolRow>(
1006 r"
1007 SELECT
1008 address,
1009 pool_identifier,
1010 dex_name,
1011 creation_block,
1012 token0_chain,
1013 token0_address,
1014 token1_chain,
1015 token1_address,
1016 fee,
1017 tick_spacing,
1018 initial_tick,
1019 initial_sqrt_price_x96,
1020 hook_address
1021 FROM pool
1022 WHERE chain_id = $1 AND dex_name = $2
1023 ORDER BY creation_block ASC
1024 ",
1025 )
1026 .bind(chain.chain_id as i32)
1027 .bind(dex_id)
1028 .fetch_all(&self.pool)
1029 .await
1030 .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
1031 }
1032
1033 pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
1047 if enable {
1048 tracing::info!("Enabling performance sync settings for bulk operations");
1049
1050 sqlx::query("SET synchronous_commit = OFF")
1052 .execute(&self.pool)
1053 .await
1054 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
1055
1056 sqlx::query("SET work_mem = '256MB'")
1058 .execute(&self.pool)
1059 .await
1060 .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1061
1062 tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1063 } else {
1064 tracing::info!("Restoring default safe database performance settings");
1065
1066 sqlx::query("SET synchronous_commit = ON")
1068 .execute(&self.pool)
1069 .await
1070 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1071
1072 sqlx::query("RESET work_mem")
1074 .execute(&self.pool)
1075 .await
1076 .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1077 }
1078
1079 Ok(())
1080 }
1081
1082 pub async fn update_dex_last_synced_block(
1088 &self,
1089 chain_id: u32,
1090 dex: &DexType,
1091 block_number: u64,
1092 ) -> anyhow::Result<()> {
1093 sqlx::query(
1094 r"
1095 UPDATE dex
1096 SET last_full_sync_pools_block_number = $3
1097 WHERE chain_id = $1 AND name = $2
1098 ",
1099 )
1100 .bind(chain_id as i32)
1101 .bind(dex.to_string())
1102 .bind(block_number as i64)
1103 .execute(&self.pool)
1104 .await
1105 .map(|_| ())
1106 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1107 }
1108
1109 pub async fn update_pool_last_synced_block(
1115 &self,
1116 chain_id: u32,
1117 dex: &DexType,
1118 pool_identifier: &PoolIdentifier,
1119 block_number: u64,
1120 ) -> anyhow::Result<()> {
1121 sqlx::query(
1122 r"
1123 UPDATE pool
1124 SET last_full_sync_block_number = $4
1125 WHERE chain_id = $1
1126 AND dex_name = $2
1127 AND pool_identifier = $3
1128 ",
1129 )
1130 .bind(chain_id as i32)
1131 .bind(dex.to_string())
1132 .bind(pool_identifier.as_ref())
1133 .bind(block_number as i64)
1134 .execute(&self.pool)
1135 .await
1136 .map(|_| ())
1137 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1138 }
1139
1140 pub async fn get_dex_last_synced_block(
1146 &self,
1147 chain_id: u32,
1148 dex: &DexType,
1149 ) -> anyhow::Result<Option<u64>> {
1150 let result = sqlx::query_as::<_, (Option<i64>,)>(
1151 r#"
1152 SELECT
1153 last_full_sync_pools_block_number
1154 FROM dex
1155 WHERE chain_id = $1
1156 AND name = $2
1157 "#,
1158 )
1159 .bind(chain_id as i32)
1160 .bind(dex.to_string())
1161 .fetch_optional(&self.pool)
1162 .await
1163 .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1164
1165 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1166 }
1167
1168 pub async fn get_pool_last_synced_block(
1174 &self,
1175 chain_id: u32,
1176 dex: &DexType,
1177 pool_identifier: &PoolIdentifier,
1178 ) -> anyhow::Result<Option<u64>> {
1179 let result = sqlx::query_as::<_, (Option<i64>,)>(
1180 r#"
1181 SELECT
1182 last_full_sync_block_number
1183 FROM pool
1184 WHERE chain_id = $1
1185 AND dex_name = $2
1186 AND pool_identifier = $3
1187 "#,
1188 )
1189 .bind(chain_id as i32)
1190 .bind(dex.to_string())
1191 .bind(pool_identifier.as_ref())
1192 .fetch_optional(&self.pool)
1193 .await
1194 .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1195
1196 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1197 }
1198
1199 pub async fn get_table_last_block(
1206 &self,
1207 chain_id: u32,
1208 table_name: &str,
1209 pool_identifier: &PoolIdentifier,
1210 ) -> anyhow::Result<Option<u64>> {
1211 let query = format!(
1212 "SELECT MAX(block) FROM {table_name} WHERE chain_id = $1 AND pool_identifier = $2"
1213 );
1214 let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1215 .bind(chain_id as i32)
1216 .bind(pool_identifier.as_ref())
1217 .fetch_optional(&self.pool)
1218 .await
1219 .map_err(|e| anyhow::anyhow!("Failed to get table last block for {table_name}: {e}"))?;
1220
1221 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1222 }
1223
1224 pub async fn add_pool_collects_batch(
1230 &self,
1231 chain_id: u32,
1232 collects: &[PoolFeeCollect],
1233 ) -> anyhow::Result<()> {
1234 if collects.is_empty() {
1235 return Ok(());
1236 }
1237
1238 let len = collects.len();
1240 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1241 let mut dex_names: Vec<String> = Vec::with_capacity(len);
1242 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1243 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1244 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1245 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1246 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1247 let mut owners: Vec<String> = Vec::with_capacity(len);
1248 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1249 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1250 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1251 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1252
1253 for collect in collects {
1255 chain_ids.push(chain_id as i32);
1256 dex_names.push(collect.dex.name.to_string());
1257 pool_identifiers.push(collect.pool_identifier.to_string());
1258 blocks.push(collect.block as i64);
1259 transaction_hashes.push(collect.transaction_hash.clone());
1260 transaction_indices.push(collect.transaction_index as i32);
1261 log_indices.push(collect.log_index as i32);
1262 owners.push(collect.owner.to_string());
1263 amount0s.push(collect.amount0.to_string());
1264 amount1s.push(collect.amount1.to_string());
1265 tick_lowers.push(collect.tick_lower);
1266 tick_uppers.push(collect.tick_upper);
1267 }
1268
1269 sqlx::query(
1271 r"
1272 INSERT INTO pool_collect_event (
1273 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1274 log_index, owner, amount0, amount1, tick_lower, tick_upper
1275 )
1276 SELECT
1277 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1278 log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1279 FROM UNNEST(
1280 $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
1281 $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::INT[], $12::INT[]
1282 ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1283 log_index, owner, amount0, amount1, tick_lower, tick_upper)
1284 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1285 ",
1286 )
1287 .bind(&chain_ids[..])
1288 .bind(&dex_names[..])
1289 .bind(&pool_identifiers[..])
1290 .bind(&blocks[..])
1291 .bind(&transaction_hashes[..])
1292 .bind(&transaction_indices[..])
1293 .bind(&log_indices[..])
1294 .bind(&owners[..])
1295 .bind(&amount0s[..])
1296 .bind(&amount1s[..])
1297 .bind(&tick_lowers[..])
1298 .bind(&tick_uppers[..])
1299 .execute(&self.pool)
1300 .await
1301 .map(|_| ())
1302 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1303 }
1304
1305 pub async fn add_pool_flash_batch(
1311 &self,
1312 chain_id: u32,
1313 flash_events: &[PoolFlash],
1314 ) -> anyhow::Result<()> {
1315 if flash_events.is_empty() {
1316 return Ok(());
1317 }
1318
1319 let len = flash_events.len();
1321 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1322 let mut dex_names: Vec<String> = Vec::with_capacity(len);
1323 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1324 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1325 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1326 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1327 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1328 let mut senders: Vec<String> = Vec::with_capacity(len);
1329 let mut recipients: Vec<String> = Vec::with_capacity(len);
1330 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1331 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1332 let mut paid0s: Vec<String> = Vec::with_capacity(len);
1333 let mut paid1s: Vec<String> = Vec::with_capacity(len);
1334
1335 for flash in flash_events {
1337 chain_ids.push(chain_id as i32);
1338 dex_names.push(flash.dex.name.to_string());
1339 pool_identifiers.push(flash.pool_identifier.to_string());
1340 blocks.push(flash.block as i64);
1341 transaction_hashes.push(flash.transaction_hash.clone());
1342 transaction_indices.push(flash.transaction_index as i32);
1343 log_indices.push(flash.log_index as i32);
1344 senders.push(flash.sender.to_string());
1345 recipients.push(flash.recipient.to_string());
1346 amount0s.push(flash.amount0.to_string());
1347 amount1s.push(flash.amount1.to_string());
1348 paid0s.push(flash.paid0.to_string());
1349 paid1s.push(flash.paid1.to_string());
1350 }
1351
1352 sqlx::query(
1354 r"
1355 INSERT INTO pool_flash_event (
1356 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1357 log_index, sender, recipient, amount0, amount1, paid0, paid1
1358 )
1359 SELECT
1360 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1361 log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1362 FROM UNNEST(
1363 $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
1364 $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[], $13::TEXT[]
1365 ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1366 log_index, sender, recipient, amount0, amount1, paid0, paid1)
1367 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1368 ",
1369 )
1370 .bind(&chain_ids[..])
1371 .bind(&dex_names[..])
1372 .bind(&pool_identifiers[..])
1373 .bind(&blocks[..])
1374 .bind(&transaction_hashes[..])
1375 .bind(&transaction_indices[..])
1376 .bind(&log_indices[..])
1377 .bind(&senders[..])
1378 .bind(&recipients[..])
1379 .bind(&amount0s[..])
1380 .bind(&amount1s[..])
1381 .bind(&paid0s[..])
1382 .bind(&paid1s[..])
1383 .execute(&self.pool)
1384 .await
1385 .map(|_| ())
1386 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1387 }
1388
1389 pub async fn add_pool_snapshot(
1395 &self,
1396 chain_id: u32,
1397 dex_name: &DexType,
1398 pool_identifier: &PoolIdentifier,
1399 snapshot: &PoolSnapshot,
1400 ) -> anyhow::Result<()> {
1401 sqlx::query(
1402 r"
1403 INSERT INTO pool_snapshot (
1404 chain_id, dex_name, pool_identifier, block, transaction_index, log_index, transaction_hash,
1405 current_tick, price_sqrt_ratio_x96, liquidity,
1406 protocol_fees_token0, protocol_fees_token1, fee_protocol,
1407 fee_growth_global_0, fee_growth_global_1,
1408 total_amount0_deposited, total_amount1_deposited,
1409 total_amount0_collected, total_amount1_collected,
1410 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1411 liquidity_utilization_rate
1412 ) VALUES (
1413 $1, $2, $3, $4, $5, $6, $7,
1414 $8, $9::U160, $10::U128, $11::U256, $12::U256, $13,
1415 $14::U256, $15::U256, $16::U256, $17::U256, $18::U256, $19::U256,
1416 $20, $21, $22, $23, $24, $25
1417 )
1418 ON CONFLICT (chain_id, pool_identifier, block, transaction_index, log_index)
1419 DO NOTHING
1420 ",
1421 )
1422 .bind(chain_id as i32)
1423 .bind(dex_name.to_string())
1424 .bind(pool_identifier.as_ref())
1425 .bind(snapshot.block_position.number as i64)
1426 .bind(snapshot.block_position.transaction_index as i32)
1427 .bind(snapshot.block_position.log_index as i32)
1428 .bind(snapshot.block_position.transaction_hash.clone())
1429 .bind(snapshot.state.current_tick)
1430 .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1431 .bind(snapshot.state.liquidity.to_string())
1432 .bind(snapshot.state.protocol_fees_token0.to_string())
1433 .bind(snapshot.state.protocol_fees_token1.to_string())
1434 .bind(snapshot.state.fee_protocol as i16)
1435 .bind(snapshot.state.fee_growth_global_0.to_string())
1436 .bind(snapshot.state.fee_growth_global_1.to_string())
1437 .bind(snapshot.analytics.total_amount0_deposited.to_string())
1438 .bind(snapshot.analytics.total_amount1_deposited.to_string())
1439 .bind(snapshot.analytics.total_amount0_collected.to_string())
1440 .bind(snapshot.analytics.total_amount1_collected.to_string())
1441 .bind(snapshot.analytics.total_swaps as i32)
1442 .bind(snapshot.analytics.total_mints as i32)
1443 .bind(snapshot.analytics.total_burns as i32)
1444 .bind(snapshot.analytics.total_fee_collects as i32)
1445 .bind(snapshot.analytics.total_flashes as i32)
1446 .bind(snapshot.analytics.liquidity_utilization_rate)
1447 .execute(&self.pool)
1448 .await
1449 .map(|_| ())
1450 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1451 }
1452
1453 pub async fn add_pool_positions_batch(
1459 &self,
1460 chain_id: u32,
1461 snapshot_block: u64,
1462 snapshot_transaction_index: u32,
1463 snapshot_log_index: u32,
1464 positions: &[(PoolIdentifier, PoolPosition)],
1465 ) -> anyhow::Result<()> {
1466 if positions.is_empty() {
1467 return Ok(());
1468 }
1469
1470 let len = positions.len();
1472 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1473 let mut owners: Vec<String> = Vec::with_capacity(len);
1474 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1475 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1476 let mut liquidities: Vec<String> = Vec::with_capacity(len);
1477 let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1478 let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1479 let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1480 let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1481 let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1482 let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1483 let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1484 let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1485
1486 for (pool_address, position) in positions {
1488 pool_identifiers.push(pool_address.to_string());
1489 owners.push(position.owner.to_string());
1490 tick_lowers.push(position.tick_lower);
1491 tick_uppers.push(position.tick_upper);
1492 liquidities.push(position.liquidity.to_string());
1493 fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1494 fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1495 tokens_owed_0s.push(position.tokens_owed_0.to_string());
1496 tokens_owed_1s.push(position.tokens_owed_1.to_string());
1497 total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1498 total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1499 total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1500 total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1501 }
1502
1503 sqlx::query(
1505 r"
1506 INSERT INTO pool_position (
1507 chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1508 owner, tick_lower, tick_upper,
1509 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1510 tokens_owed_0, tokens_owed_1,
1511 total_amount0_deposited, total_amount1_deposited,
1512 total_amount0_collected, total_amount1_collected
1513 )
1514 SELECT
1515 $1, pool_identifier, $2, $3, $4,
1516 owner, tick_lower, tick_upper,
1517 liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1518 tokens_owed_0::U128, tokens_owed_1::U128,
1519 total_amount0_deposited::U256, total_amount1_deposited::U256,
1520 total_amount0_collected::U128, total_amount1_collected::U128
1521 FROM UNNEST(
1522 $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1523 $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1524 $16::TEXT[], $17::TEXT[]
1525 ) AS t(pool_identifier, owner, tick_lower, tick_upper,
1526 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1527 tokens_owed_0, tokens_owed_1,
1528 total_amount0_deposited, total_amount1_deposited,
1529 total_amount0_collected, total_amount1_collected)
1530 ON CONFLICT (chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1531 DO NOTHING
1532 ",
1533 )
1534 .bind(chain_id as i32)
1535 .bind(snapshot_block as i64)
1536 .bind(snapshot_transaction_index as i32)
1537 .bind(snapshot_log_index as i32)
1538 .bind(&pool_identifiers[..])
1539 .bind(&owners[..])
1540 .bind(&tick_lowers[..])
1541 .bind(&tick_uppers[..])
1542 .bind(&liquidities[..])
1543 .bind(&fee_growth_inside_0_lasts[..])
1544 .bind(&fee_growth_inside_1_lasts[..])
1545 .bind(&tokens_owed_0s[..])
1546 .bind(&tokens_owed_1s[..])
1547 .bind(&total_amount0_depositeds as &[Option<String>])
1548 .bind(&total_amount1_depositeds as &[Option<String>])
1549 .bind(&total_amount0_collecteds as &[Option<String>])
1550 .bind(&total_amount1_collecteds as &[Option<String>])
1551 .execute(&self.pool)
1552 .await
1553 .map(|_| ())
1554 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1555 }
1556
1557 pub async fn add_pool_ticks_batch(
1563 &self,
1564 chain_id: u32,
1565 snapshot_block: u64,
1566 snapshot_transaction_index: u32,
1567 snapshot_log_index: u32,
1568 ticks: &[(PoolIdentifier, &PoolTick)],
1569 ) -> anyhow::Result<()> {
1570 if ticks.is_empty() {
1571 return Ok(());
1572 }
1573
1574 let len = ticks.len();
1576 let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1577 let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1578 let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1579 let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1580 let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1581 let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1582 let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1583 let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1584
1585 for (pool_address, tick) in ticks {
1587 pool_identifiers.push(pool_address.to_string());
1588 tick_values.push(tick.value);
1589 liquidity_grosses.push(tick.liquidity_gross.to_string());
1590 liquidity_nets.push(tick.liquidity_net.to_string());
1591 fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1592 fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1593 initializeds.push(tick.initialized);
1594 last_updated_blocks.push(tick.last_updated_block as i64);
1595 }
1596
1597 sqlx::query(
1599 r"
1600 INSERT INTO pool_tick (
1601 chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1602 tick_value, liquidity_gross, liquidity_net,
1603 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1604 )
1605 SELECT
1606 $1, pool_identifier, $2, $3, $4,
1607 tick_value, liquidity_gross::U128, liquidity_net::I128,
1608 fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1609 FROM UNNEST(
1610 $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1611 $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1612 ) AS t(pool_identifier, tick_value, liquidity_gross, liquidity_net,
1613 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1614 ON CONFLICT (chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1615 DO NOTHING
1616 ",
1617 )
1618 .bind(chain_id as i32)
1619 .bind(snapshot_block as i64)
1620 .bind(snapshot_transaction_index as i32)
1621 .bind(snapshot_log_index as i32)
1622 .bind(&pool_identifiers[..])
1623 .bind(&tick_values[..])
1624 .bind(&liquidity_grosses[..])
1625 .bind(&liquidity_nets[..])
1626 .bind(&fee_growth_outside_0s[..])
1627 .bind(&fee_growth_outside_1s[..])
1628 .bind(&initializeds[..])
1629 .bind(&last_updated_blocks[..])
1630 .execute(&self.pool)
1631 .await
1632 .map(|_| ())
1633 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1634 }
1635
1636 pub async fn update_pool_initial_price_tick(
1642 &self,
1643 chain_id: u32,
1644 initialize_event: &InitializeEvent,
1645 ) -> anyhow::Result<()> {
1646 sqlx::query(
1647 r"
1648 UPDATE pool
1649 SET
1650 initial_tick = $4,
1651 initial_sqrt_price_x96 = $5
1652 WHERE chain_id = $1
1653 AND dex_name = $2
1654 AND pool_identifier = $3
1655 ",
1656 )
1657 .bind(chain_id as i32)
1658 .bind(initialize_event.dex.name.to_string())
1659 .bind(initialize_event.pool_identifier.as_ref())
1660 .bind(initialize_event.tick)
1661 .bind(initialize_event.sqrt_price_x96.to_string())
1662 .execute(&self.pool)
1663 .await
1664 .map(|_| ())
1665 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1666 }
1667
1668 pub async fn load_latest_valid_pool_snapshot(
1676 &self,
1677 chain_id: u32,
1678 pool_identifier: &PoolIdentifier,
1679 ) -> anyhow::Result<Option<PoolSnapshot>> {
1680 let result = sqlx::query(
1681 r"
1682 SELECT
1683 block, transaction_index, log_index, transaction_hash,
1684 current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1685 protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1686 fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1687 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1688 total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1689 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1690 liquidity_utilization_rate,
1691 (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1692 FROM pool_snapshot
1693 WHERE chain_id = $1 AND pool_identifier = $2 AND is_valid = TRUE
1694 ORDER BY block DESC, transaction_index DESC, log_index DESC
1695 LIMIT 1
1696 ",
1697 )
1698 .bind(chain_id as i32)
1699 .bind(pool_identifier.as_ref())
1700 .fetch_optional(&self.pool)
1701 .await
1702 .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {e}"))?;
1703
1704 if let Some(row) = result {
1705 let block: i64 = row.get("block");
1707 let transaction_index: i32 = row.get("transaction_index");
1708 let log_index: i32 = row.get("log_index");
1709 let transaction_hash: String = row.get("transaction_hash");
1710
1711 let block_position = BlockPosition::new(
1712 block as u64,
1713 transaction_hash,
1714 transaction_index as u32,
1715 log_index as u32,
1716 );
1717
1718 let state = PoolState {
1719 current_tick: row.get("current_tick"),
1720 price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1721 liquidity: row.get::<String, _>("liquidity").parse()?,
1722 protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1723 protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1724 fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1725 fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1726 fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1727 };
1728
1729 let analytics = PoolAnalytics {
1730 total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1731 total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1732 total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1733 total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1734 total_swaps: row.get::<i32, _>("total_swaps") as u64,
1735 total_mints: row.get::<i32, _>("total_mints") as u64,
1736 total_burns: row.get::<i32, _>("total_burns") as u64,
1737 total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1738 total_flashes: row.get::<i32, _>("total_flashes") as u64,
1739 liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1740 };
1741
1742 let positions = self
1744 .load_pool_positions_for_snapshot(
1745 chain_id,
1746 pool_identifier,
1747 block as u64,
1748 transaction_index as u32,
1749 log_index as u32,
1750 )
1751 .await?;
1752
1753 let ticks = self
1754 .load_pool_ticks_for_snapshot(
1755 chain_id,
1756 pool_identifier,
1757 block as u64,
1758 transaction_index as u32,
1759 log_index as u32,
1760 )
1761 .await?;
1762
1763 let dex_name: String = row.get("dex_name");
1764 let chain = Chain::from_chain_id(chain_id)
1765 .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {chain_id}"))?;
1766
1767 let dex_type = DexType::from_dex_name(&dex_name)
1768 .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {dex_name}"))?;
1769
1770 let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1771 .ok_or_else(|| {
1772 anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1773 })?;
1774
1775 let instrument_id =
1776 Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_identifier.as_ref());
1777
1778 Ok(Some(PoolSnapshot::new(
1779 instrument_id,
1780 state,
1781 positions,
1782 ticks,
1783 analytics,
1784 block_position,
1785 )))
1786 } else {
1787 Ok(None)
1788 }
1789 }
1790
1791 pub async fn mark_pool_snapshot_valid(
1797 &self,
1798 chain_id: u32,
1799 pool_identifier: &PoolIdentifier,
1800 block: u64,
1801 transaction_index: u32,
1802 log_index: u32,
1803 ) -> anyhow::Result<()> {
1804 sqlx::query(
1805 r"
1806 UPDATE pool_snapshot
1807 SET is_valid = TRUE
1808 WHERE chain_id = $1
1809 AND pool_identifier = $2
1810 AND block = $3
1811 AND transaction_index = $4
1812 AND log_index = $5
1813 ",
1814 )
1815 .bind(chain_id as i32)
1816 .bind(pool_identifier.as_ref())
1817 .bind(block as i64)
1818 .bind(transaction_index as i32)
1819 .bind(log_index as i32)
1820 .execute(&self.pool)
1821 .await
1822 .map(|_| ())
1823 .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {e}"))
1824 }
1825
1826 pub async fn load_pool_positions_for_snapshot(
1832 &self,
1833 chain_id: u32,
1834 pool_identifier: &PoolIdentifier,
1835 snapshot_block: u64,
1836 snapshot_transaction_index: u32,
1837 snapshot_log_index: u32,
1838 ) -> anyhow::Result<Vec<PoolPosition>> {
1839 let rows = sqlx::query(
1840 r"
1841 SELECT
1842 owner, tick_lower, tick_upper,
1843 liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1844 tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1845 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1846 total_amount0_collected::TEXT, total_amount1_collected::TEXT
1847 FROM pool_position
1848 WHERE chain_id = $1
1849 AND pool_identifier = $2
1850 AND snapshot_block = $3
1851 AND snapshot_transaction_index = $4
1852 AND snapshot_log_index = $5
1853 ",
1854 )
1855 .bind(chain_id as i32)
1856 .bind(pool_identifier.as_ref())
1857 .bind(snapshot_block as i64)
1858 .bind(snapshot_transaction_index as i32)
1859 .bind(snapshot_log_index as i32)
1860 .fetch_all(&self.pool)
1861 .await
1862 .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {e}"))?;
1863
1864 rows.iter()
1865 .map(|row| {
1866 let owner: String = row.get("owner");
1867 let position = PoolPosition {
1868 owner: validate_address(&owner)?,
1869 tick_lower: row.get("tick_lower"),
1870 tick_upper: row.get("tick_upper"),
1871 liquidity: row.get::<String, _>("liquidity").parse()?,
1872 fee_growth_inside_0_last: row
1873 .get::<String, _>("fee_growth_inside_0_last")
1874 .parse()?,
1875 fee_growth_inside_1_last: row
1876 .get::<String, _>("fee_growth_inside_1_last")
1877 .parse()?,
1878 tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1879 tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1880 total_amount0_deposited: row
1881 .get::<String, _>("total_amount0_deposited")
1882 .parse()?,
1883 total_amount1_deposited: row
1884 .get::<String, _>("total_amount1_deposited")
1885 .parse()?,
1886 total_amount0_collected: row
1887 .get::<String, _>("total_amount0_collected")
1888 .parse()?,
1889 total_amount1_collected: row
1890 .get::<String, _>("total_amount1_collected")
1891 .parse()?,
1892 };
1893 Ok(position)
1894 })
1895 .collect()
1896 }
1897
1898 pub async fn load_pool_ticks_for_snapshot(
1904 &self,
1905 chain_id: u32,
1906 pool_identifier: &PoolIdentifier,
1907 snapshot_block: u64,
1908 snapshot_transaction_index: u32,
1909 snapshot_log_index: u32,
1910 ) -> anyhow::Result<Vec<PoolTick>> {
1911 let rows = sqlx::query(
1912 r"
1913 SELECT
1914 tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1915 fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1916 last_updated_block
1917 FROM pool_tick
1918 WHERE chain_id = $1
1919 AND pool_identifier = $2
1920 AND snapshot_block = $3
1921 AND snapshot_transaction_index = $4
1922 AND snapshot_log_index = $5
1923 ",
1924 )
1925 .bind(chain_id as i32)
1926 .bind(pool_identifier.as_ref())
1927 .bind(snapshot_block as i64)
1928 .bind(snapshot_transaction_index as i32)
1929 .bind(snapshot_log_index as i32)
1930 .fetch_all(&self.pool)
1931 .await
1932 .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {e}"))?;
1933
1934 rows.iter()
1935 .map(|row| {
1936 let tick = PoolTick::new(
1937 row.get("tick_value"),
1938 row.get::<String, _>("liquidity_gross").parse()?,
1939 row.get::<String, _>("liquidity_net").parse()?,
1940 row.get::<String, _>("fee_growth_outside_0").parse()?,
1941 row.get::<String, _>("fee_growth_outside_1").parse()?,
1942 row.get("initialized"),
1943 row.get::<i64, _>("last_updated_block") as u64,
1944 );
1945 Ok(tick)
1946 })
1947 .collect()
1948 }
1949
1950 pub fn stream_pool_events<'a>(
1963 &'a self,
1964 chain: SharedChain,
1965 dex: SharedDex,
1966 instrument_id: InstrumentId,
1967 pool_identifier: PoolIdentifier,
1968 from_position: Option<BlockPosition>,
1969 ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1970 const QUERY_ALL: &str = r"
1972 (SELECT
1973 'swap' as event_type,
1974 chain_id,
1975 pool_identifier,
1976 block,
1977 transaction_hash,
1978 transaction_index,
1979 log_index,
1980 sender,
1981 recipient,
1982 NULL::TEXT as owner,
1983 sqrt_price_x96::TEXT,
1984 liquidity::TEXT as swap_liquidity,
1985 tick as swap_tick,
1986 amount0::TEXT as swap_amount0,
1987 amount1::TEXT as swap_amount1,
1988 NULL::TEXT as position_liquidity,
1989 NULL::TEXT as amount0,
1990 NULL::TEXT as amount1,
1991 NULL::INT as tick_lower,
1992 NULL::INT as tick_upper,
1993 NULL::TEXT as liquidity_event_type,
1994 NULL::TEXT as flash_amount0,
1995 NULL::TEXT as flash_amount1,
1996 NULL::TEXT as flash_paid0,
1997 NULL::TEXT as flash_paid1
1998 FROM pool_swap_event
1999 WHERE chain_id = $1 AND pool_identifier = $2)
2000 UNION ALL
2001 (SELECT
2002 'liquidity' as event_type,
2003 chain_id,
2004 pool_identifier,
2005 block,
2006 transaction_hash,
2007 transaction_index,
2008 log_index,
2009 sender,
2010 NULL::TEXT as recipient,
2011 owner,
2012 NULL::text as sqrt_price_x96,
2013 NULL::TEXT as swap_liquidity,
2014 NULL::INT as swap_tick,
2015 amount0::TEXT as swap_amount0,
2016 amount1::TEXT as swap_amount1,
2017 position_liquidity::TEXT,
2018 amount0::TEXT,
2019 amount1::TEXT,
2020 tick_lower::INT,
2021 tick_upper::INT,
2022 event_type as liquidity_event_type,
2023 NULL::TEXT as flash_amount0,
2024 NULL::TEXT as flash_amount1,
2025 NULL::TEXT as flash_paid0,
2026 NULL::TEXT as flash_paid1
2027 FROM pool_liquidity_event
2028 WHERE chain_id = $1 AND pool_identifier = $2)
2029 UNION ALL
2030 (SELECT
2031 'collect' as event_type,
2032 chain_id,
2033 pool_identifier,
2034 block,
2035 transaction_hash,
2036 transaction_index,
2037 log_index,
2038 NULL::TEXT as sender,
2039 NULL::TEXT as recipient,
2040 owner,
2041 NULL::TEXT as sqrt_price_x96,
2042 NULL::TEXT as swap_liquidity,
2043 NULL::INT AS swap_tick,
2044 amount0::TEXT as swap_amount0,
2045 amount1::TEXT as swap_amount1,
2046 NULL::TEXT as position_liquidity,
2047 amount0::TEXT,
2048 amount1::TEXT,
2049 tick_lower::INT,
2050 tick_upper::INT,
2051 NULL::TEXT as liquidity_event_type,
2052 NULL::TEXT as flash_amount0,
2053 NULL::TEXT as flash_amount1,
2054 NULL::TEXT as flash_paid0,
2055 NULL::TEXT as flash_paid1
2056 FROM pool_collect_event
2057 WHERE chain_id = $1 AND pool_identifier = $2)
2058 UNION ALL
2059 (SELECT
2060 'flash' as event_type,
2061 chain_id,
2062 pool_identifier,
2063 block,
2064 transaction_hash,
2065 transaction_index,
2066 log_index,
2067 sender,
2068 recipient,
2069 NULL::TEXT as owner,
2070 NULL::TEXT as sqrt_price_x96,
2071 NULL::TEXT as swap_liquidity,
2072 NULL::INT AS swap_tick,
2073 NULL::TEXT as swap_amount0,
2074 NULL::TEXT as swap_amount1,
2075 NULL::TEXT as position_liquidity,
2076 NULL::TEXT as amount0,
2077 NULL::TEXT as amount1,
2078 NULL::INT as tick_lower,
2079 NULL::INT as tick_upper,
2080 NULL::TEXT as liquidity_event_type,
2081 amount0::TEXT as flash_amount0,
2082 amount1::TEXT as flash_amount1,
2083 paid0::TEXT as flash_paid0,
2084 paid1::TEXT as flash_paid1
2085 FROM pool_flash_event
2086 WHERE chain_id = $1 AND pool_identifier = $2)
2087 ORDER BY block, transaction_index, log_index";
2088
2089 const QUERY_FROM_POSITION: &str = r"
2091 (SELECT
2092 'swap' as event_type,
2093 chain_id,
2094 pool_identifier,
2095 block,
2096 transaction_hash,
2097 transaction_index,
2098 log_index,
2099 sender,
2100 recipient,
2101 NULL::TEXT as owner,
2102 sqrt_price_x96::TEXT,
2103 liquidity::TEXT as swap_liquidity,
2104 tick as swap_tick,
2105 amount0::TEXT as swap_amount0,
2106 amount1::TEXT as swap_amount1,
2107 NULL::TEXT as position_liquidity,
2108 NULL::TEXT as amount0,
2109 NULL::TEXT as amount1,
2110 NULL::INT as tick_lower,
2111 NULL::INT as tick_upper,
2112 NULL::TEXT as liquidity_event_type,
2113 NULL::TEXT as flash_amount0,
2114 NULL::TEXT as flash_amount1,
2115 NULL::TEXT as flash_paid0,
2116 NULL::TEXT as flash_paid1
2117 FROM pool_swap_event
2118 WHERE chain_id = $1 AND pool_identifier = $2
2119 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2120 UNION ALL
2121 (SELECT
2122 'liquidity' as event_type,
2123 chain_id,
2124 pool_identifier,
2125 block,
2126 transaction_hash,
2127 transaction_index,
2128 log_index,
2129 sender,
2130 NULL::TEXT as recipient,
2131 owner,
2132 NULL::text as sqrt_price_x96,
2133 NULL::TEXT as swap_liquidity,
2134 NULL::INT as swap_tick,
2135 amount0::TEXT as swap_amount0,
2136 amount1::TEXT as swap_amount1,
2137 position_liquidity::TEXT,
2138 amount0::TEXT,
2139 amount1::TEXT,
2140 tick_lower::INT,
2141 tick_upper::INT,
2142 event_type as liquidity_event_type,
2143 NULL::TEXT as flash_amount0,
2144 NULL::TEXT as flash_amount1,
2145 NULL::TEXT as flash_paid0,
2146 NULL::TEXT as flash_paid1
2147 FROM pool_liquidity_event
2148 WHERE chain_id = $1 AND pool_identifier = $2
2149 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2150 UNION ALL
2151 (SELECT
2152 'collect' as event_type,
2153 chain_id,
2154 pool_identifier,
2155 block,
2156 transaction_hash,
2157 transaction_index,
2158 log_index,
2159 NULL::TEXT as sender,
2160 NULL::TEXT as recipient,
2161 owner,
2162 NULL::TEXT as sqrt_price_x96,
2163 NULL::TEXT as swap_liquidity,
2164 NULL::INT AS swap_tick,
2165 amount0::TEXT as swap_amount0,
2166 amount1::TEXT as swap_amount1,
2167 NULL::TEXT as position_liquidity,
2168 amount0::TEXT,
2169 amount1::TEXT,
2170 tick_lower::INT,
2171 tick_upper::INT,
2172 NULL::TEXT as liquidity_event_type,
2173 NULL::TEXT as flash_amount0,
2174 NULL::TEXT as flash_amount1,
2175 NULL::TEXT as flash_paid0,
2176 NULL::TEXT as flash_paid1
2177 FROM pool_collect_event
2178 WHERE chain_id = $1 AND pool_identifier = $2
2179 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2180 UNION ALL
2181 (SELECT
2182 'flash' as event_type,
2183 chain_id,
2184 pool_identifier,
2185 block,
2186 transaction_hash,
2187 transaction_index,
2188 log_index,
2189 sender,
2190 recipient,
2191 NULL::TEXT as owner,
2192 NULL::TEXT as sqrt_price_x96,
2193 NULL::TEXT as swap_liquidity,
2194 NULL::INT AS swap_tick,
2195 NULL::TEXT as swap_amount0,
2196 NULL::TEXT as swap_amount1,
2197 NULL::TEXT as position_liquidity,
2198 NULL::TEXT as amount0,
2199 NULL::TEXT as amount1,
2200 NULL::INT as tick_lower,
2201 NULL::INT as tick_upper,
2202 NULL::TEXT as liquidity_event_type,
2203 amount0::TEXT as flash_amount0,
2204 amount1::TEXT as flash_amount1,
2205 paid0::TEXT as flash_paid0,
2206 paid1::TEXT as flash_paid1
2207 FROM pool_flash_event
2208 WHERE chain_id = $1 AND pool_identifier = $2
2209 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2210 ORDER BY block, transaction_index, log_index";
2211
2212 let query = if let Some(pos) = from_position {
2214 sqlx::query(QUERY_FROM_POSITION)
2215 .bind(chain.chain_id as i32)
2216 .bind(pool_identifier.to_string())
2217 .bind(pos.number as i64)
2218 .bind(pos.transaction_index as i32)
2219 .bind(pos.log_index as i32)
2220 .fetch(&self.pool)
2221 } else {
2222 sqlx::query(QUERY_ALL)
2223 .bind(chain.chain_id as i32)
2224 .bind(pool_identifier.to_string())
2225 .fetch(&self.pool)
2226 };
2227
2228 let stream = query.map(move |row_result| match row_result {
2230 Ok(row) => {
2231 transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2232 .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {e}"))
2233 }
2234 Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {e}")),
2235 });
2236
2237 Box::pin(stream)
2238 }
2239}