1use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21 defi::{
22 Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23 data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24 pool_analysis::{
25 position::PoolPosition,
26 snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27 },
28 tick_map::tick::PoolTick,
29 validation::validate_address,
30 },
31 identifiers::InstrumentId,
32};
33use sqlx::{PgPool, Row, postgres::PgConnectOptions};
34
35use crate::{
36 cache::{
37 consistency::CachedBlocksConsistencyStatus,
38 copy::PostgresCopyHandler,
39 rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
40 types::{U128Pg, U256Pg},
41 },
42 events::initialize::InitializeEvent,
43};
44
45#[derive(Debug)]
47pub struct BlockchainCacheDatabase {
48 pool: PgPool,
50}
51
52impl BlockchainCacheDatabase {
53 pub async fn init(pg_options: PgConnectOptions) -> Self {
59 let pool = sqlx::postgres::PgPoolOptions::new()
60 .max_connections(32) .min_connections(5) .acquire_timeout(std::time::Duration::from_secs(3))
63 .connect_with(pg_options)
64 .await
65 .expect("Error connecting to Postgres");
66 Self { pool }
67 }
68
69 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
75 sqlx::query(
76 r"
77 INSERT INTO chain (
78 chain_id, name
79 ) VALUES ($1,$2)
80 ON CONFLICT (chain_id)
81 DO NOTHING
82 ",
83 )
84 .bind(chain.chain_id as i32)
85 .bind(chain.name.to_string())
86 .execute(&self.pool)
87 .await
88 .map(|_| ())
89 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
90 }
91
92 pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
99 let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
100 .bind(chain.chain_id as i32)
101 .fetch_one(&self.pool)
102 .await
103 .map_err(|e| {
104 anyhow::anyhow!(
105 "Failed to call create_block_partition for chain {}: {e}",
106 chain.chain_id
107 )
108 })?;
109
110 Ok(result.0)
111 }
112
113 pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
120 let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
121 .bind(chain.chain_id as i32)
122 .fetch_one(&self.pool)
123 .await
124 .map_err(|e| {
125 anyhow::anyhow!(
126 "Failed to call create_token_partition for chain {}: {e}",
127 chain.chain_id
128 )
129 })?;
130
131 Ok(result.0)
132 }
133
134 pub async fn get_block_consistency_status(
140 &self,
141 chain: &Chain,
142 ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
143 tracing::info!("Fetching block consistency status");
144
145 let result: (i64, i64) = sqlx::query_as(
146 r"
147 SELECT
148 COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
149 get_last_continuous_block($1) as last_continuous_block
150 "
151 )
152 .bind(chain.chain_id as i32)
153 .fetch_one(&self.pool)
154 .await
155 .map_err(|e| {
156 anyhow::anyhow!(
157 "Failed to get block info for chain {}: {}",
158 chain.chain_id,
159 e
160 )
161 })?;
162
163 Ok(CachedBlocksConsistencyStatus::new(
164 result.0 as u64,
165 result.1 as u64,
166 ))
167 }
168
169 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
175 sqlx::query(
176 r"
177 INSERT INTO block (
178 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
179 base_fee_per_gas, blob_gas_used, excess_blob_gas,
180 l1_gas_price, l1_gas_used, l1_fee_scalar
181 ) VALUES (
182 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
183 )
184 ON CONFLICT (chain_id, number)
185 DO UPDATE
186 SET
187 hash = $3,
188 parent_hash = $4,
189 miner = $5,
190 gas_limit = $6,
191 gas_used = $7,
192 timestamp = $8,
193 base_fee_per_gas = $9,
194 blob_gas_used = $10,
195 excess_blob_gas = $11,
196 l1_gas_price = $12,
197 l1_gas_used = $13,
198 l1_fee_scalar = $14
199 ",
200 )
201 .bind(chain_id as i32)
202 .bind(block.number as i64)
203 .bind(block.hash.as_str())
204 .bind(block.parent_hash.as_str())
205 .bind(block.miner.as_str())
206 .bind(block.gas_limit as i64)
207 .bind(block.gas_used as i64)
208 .bind(block.timestamp.to_string())
209 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
210 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
211 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
212 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
213 .bind(block.l1_gas_used.map(|v| v as i64))
214 .bind(block.l1_fee_scalar.map(|v| v as i64))
215 .execute(&self.pool)
216 .await
217 .map(|_| ())
218 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
219 }
220
221 pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
227 if blocks.is_empty() {
228 return Ok(());
229 }
230
231 let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
233 let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
234 let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
235 let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
236 let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
237 let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
238 let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
239 let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
240 let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241 let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242 let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243 let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
244 let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245
246 for block in blocks {
248 numbers.push(block.number as i64);
249 hashes.push(block.hash.clone());
250 parent_hashes.push(block.parent_hash.clone());
251 miners.push(block.miner.to_string());
252 gas_limits.push(block.gas_limit as i64);
253 gas_useds.push(block.gas_used as i64);
254 timestamps.push(block.timestamp.to_string());
255 base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
256 blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
257 excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
258 l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
259 l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
260 l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
261 }
262
263 sqlx::query(
265 r"
266 INSERT INTO block (
267 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
268 base_fee_per_gas, blob_gas_used, excess_blob_gas,
269 l1_gas_price, l1_gas_used, l1_fee_scalar
270 )
271 SELECT
272 $1, *
273 FROM UNNEST(
274 $2::int8[], $3::text[], $4::text[], $5::text[],
275 $6::int8[], $7::int8[], $8::text[],
276 $9::text[], $10::text[], $11::text[],
277 $12::text[], $13::int8[], $14::int8[]
278 )
279 ON CONFLICT (chain_id, number) DO NOTHING
280 ",
281 )
282 .bind(chain_id as i32)
283 .bind(&numbers[..])
284 .bind(&hashes[..])
285 .bind(&parent_hashes[..])
286 .bind(&miners[..])
287 .bind(&gas_limits[..])
288 .bind(&gas_useds[..])
289 .bind(×tamps[..])
290 .bind(&base_fee_per_gases as &[Option<String>])
291 .bind(&blob_gas_useds as &[Option<String>])
292 .bind(&excess_blob_gases as &[Option<String>])
293 .bind(&l1_gas_prices as &[Option<String>])
294 .bind(&l1_gas_useds as &[Option<i64>])
295 .bind(&l1_fee_scalars as &[Option<i64>])
296 .execute(&self.pool)
297 .await
298 .map(|_| ())
299 .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
300 }
301
302 pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
311 let copy_handler = PostgresCopyHandler::new(&self.pool);
312 copy_handler.copy_blocks(chain_id, blocks).await
313 }
314
315 pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
321 let copy_handler = PostgresCopyHandler::new(&self.pool);
322 copy_handler.copy_tokens(chain_id, tokens).await
323 }
324
325 pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
331 let copy_handler = PostgresCopyHandler::new(&self.pool);
332 copy_handler.copy_pools(chain_id, pools).await
333 }
334
335 pub async fn add_pool_swaps_copy(
344 &self,
345 chain_id: u32,
346 swaps: &[PoolSwap],
347 ) -> anyhow::Result<()> {
348 let copy_handler = PostgresCopyHandler::new(&self.pool);
349 copy_handler.copy_pool_swaps(chain_id, swaps).await
350 }
351
352 pub async fn add_pool_liquidity_updates_copy(
361 &self,
362 chain_id: u32,
363 updates: &[PoolLiquidityUpdate],
364 ) -> anyhow::Result<()> {
365 let copy_handler = PostgresCopyHandler::new(&self.pool);
366 copy_handler
367 .copy_pool_liquidity_updates(chain_id, updates)
368 .await
369 }
370
371 pub async fn copy_pool_fee_collects_batch(
380 &self,
381 chain_id: u32,
382 collects: &[PoolFeeCollect],
383 ) -> anyhow::Result<()> {
384 let copy_handler = PostgresCopyHandler::new(&self.pool);
385 copy_handler.copy_pool_collects(chain_id, collects).await
386 }
387
388 pub async fn load_block_timestamps(
394 &self,
395 chain: SharedChain,
396 from_block: u64,
397 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
398 sqlx::query_as::<_, BlockTimestampRow>(
399 r"
400 SELECT
401 number,
402 timestamp
403 FROM block
404 WHERE chain_id = $1 AND number >= $2
405 ORDER BY number ASC
406 ",
407 )
408 .bind(chain.chain_id as i32)
409 .bind(from_block as i64)
410 .fetch_all(&self.pool)
411 .await
412 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
413 }
414
415 pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
421 sqlx::query(
422 r"
423 INSERT INTO dex (
424 chain_id, name, factory_address, creation_block
425 ) VALUES ($1, $2, $3, $4)
426 ON CONFLICT (chain_id, name)
427 DO UPDATE
428 SET
429 factory_address = $3,
430 creation_block = $4
431 ",
432 )
433 .bind(dex.chain.chain_id as i32)
434 .bind(dex.name.to_string())
435 .bind(dex.factory.to_string())
436 .bind(dex.factory_creation_block as i64)
437 .execute(&self.pool)
438 .await
439 .map(|_| ())
440 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
441 }
442
443 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
449 sqlx::query(
450 r"
451 INSERT INTO pool (
452 chain_id, address, dex_name, creation_block,
453 token0_chain, token0_address,
454 token1_chain, token1_address,
455 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
456 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
457 ON CONFLICT (chain_id, address)
458 DO UPDATE
459 SET
460 dex_name = $3,
461 creation_block = $4,
462 token0_chain = $5,
463 token0_address = $6,
464 token1_chain = $7,
465 token1_address = $8,
466 fee = $9,
467 tick_spacing = $10,
468 initial_tick = $11,
469 initial_sqrt_price_x96 = $12
470 ",
471 )
472 .bind(pool.chain.chain_id as i32)
473 .bind(pool.address.to_string())
474 .bind(pool.dex.name.to_string())
475 .bind(pool.creation_block as i64)
476 .bind(pool.token0.chain.chain_id as i32)
477 .bind(pool.token0.address.to_string())
478 .bind(pool.token1.chain.chain_id as i32)
479 .bind(pool.token1.address.to_string())
480 .bind(pool.fee.map(|fee| fee as i32))
481 .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
482 .bind(pool.initial_tick)
483 .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
484 .execute(&self.pool)
485 .await
486 .map(|_| ())
487 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
488 }
489
490 pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
496 if pools.is_empty() {
497 return Ok(());
498 }
499
500 let len = pools.len();
502 let mut addresses: Vec<String> = Vec::with_capacity(len);
503 let mut dex_names: Vec<String> = Vec::with_capacity(len);
504 let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
505 let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
506 let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
507 let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
508 let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
509 let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
510 let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
511 let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
512 let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
513 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
514
515 for pool in pools {
517 chain_ids.push(pool.chain.chain_id as i32);
518 addresses.push(pool.address.to_string());
519 dex_names.push(pool.dex.name.to_string());
520 creation_blocks.push(pool.creation_block as i64);
521 token0_chains.push(pool.token0.chain.chain_id as i32);
522 token0_addresses.push(pool.token0.address.to_string());
523 token1_chains.push(pool.token1.chain.chain_id as i32);
524 token1_addresses.push(pool.token1.address.to_string());
525 fees.push(pool.fee.map(|fee| fee as i32));
526 tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
527 initial_ticks.push(pool.initial_tick);
528 initial_sqrt_price_x96s
529 .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
530 }
531
532 sqlx::query(
534 r"
535 INSERT INTO pool (
536 chain_id, address, dex_name, creation_block,
537 token0_chain, token0_address,
538 token1_chain, token1_address,
539 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
540 )
541 SELECT *
542 FROM UNNEST(
543 $1::int4[], $2::text[], $3::text[], $4::int8[],
544 $5::int4[], $6::text[], $7::int4[], $8::text[],
545 $9::int4[], $10::int4[], $11::int4[], $12::text[]
546 )
547 ON CONFLICT (chain_id, address) DO NOTHING
548 ",
549 )
550 .bind(&chain_ids[..])
551 .bind(&addresses[..])
552 .bind(&dex_names[..])
553 .bind(&creation_blocks[..])
554 .bind(&token0_chains[..])
555 .bind(&token0_addresses[..])
556 .bind(&token1_chains[..])
557 .bind(&token1_addresses[..])
558 .bind(&fees[..])
559 .bind(&tick_spacings[..])
560 .bind(&initial_ticks[..])
561 .bind(&initial_sqrt_price_x96s[..])
562 .execute(&self.pool)
563 .await
564 .map(|_| ())
565 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
566 }
567
568 pub async fn add_pool_swaps_batch(
574 &self,
575 chain_id: u32,
576 swaps: &[PoolSwap],
577 ) -> anyhow::Result<()> {
578 if swaps.is_empty() {
579 return Ok(());
580 }
581
582 let len = swaps.len();
584 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
585 let mut blocks: Vec<i64> = Vec::with_capacity(len);
586 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
587 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
588 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
589 let mut senders: Vec<String> = Vec::with_capacity(len);
590 let mut recipients: Vec<String> = Vec::with_capacity(len);
591 let mut sides: Vec<Option<String>> = Vec::with_capacity(len);
592 let mut sizes: Vec<Option<String>> = Vec::with_capacity(len);
593 let mut prices: Vec<Option<String>> = Vec::with_capacity(len);
594 let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
595 let mut liquidities: Vec<String> = Vec::with_capacity(len);
596 let mut ticks: Vec<i32> = Vec::with_capacity(len);
597 let mut amount0s: Vec<String> = Vec::with_capacity(len);
598 let mut amount1s: Vec<String> = Vec::with_capacity(len);
599 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
600
601 for swap in swaps {
603 chain_ids.push(chain_id as i32);
604 pool_addresses.push(swap.pool_address.to_string());
605 blocks.push(swap.block as i64);
606 transaction_hashes.push(swap.transaction_hash.clone());
607 transaction_indices.push(swap.transaction_index as i32);
608 log_indices.push(swap.log_index as i32);
609 senders.push(swap.sender.to_string());
610 recipients.push(swap.recipient.to_string());
611 sides.push(swap.side.map(|side| side.to_string()));
612 sizes.push(swap.size.map(|size| size.to_string()));
613 prices.push(swap.price.map(|price| price.to_string()));
614 sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
615 liquidities.push(swap.liquidity.to_string());
616 ticks.push(swap.tick);
617 amount0s.push(swap.amount0.to_string());
618 amount1s.push(swap.amount1.to_string());
619 }
620
621 sqlx::query(
623 r"
624 INSERT INTO pool_swap_event (
625 chain_id, pool_address, block, transaction_hash, transaction_index,
626 log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1
627 )
628 SELECT
629 chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
630 side, size, price, sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256
631 FROM UNNEST(
632 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[], $6::INT[],
633 $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
634 $12::TEXT[], $13::TEXT[], $14::INT[], $15::TEXT[], $16::TEXT[]
635 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
636 log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1)
637 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
638 ",
639 )
640 .bind(&chain_ids[..])
641 .bind(&pool_addresses[..])
642 .bind(&blocks[..])
643 .bind(&transaction_hashes[..])
644 .bind(&transaction_indices[..])
645 .bind(&log_indices[..])
646 .bind(&senders[..])
647 .bind(&recipients[..])
648 .bind(&sides[..])
649 .bind(&sizes[..])
650 .bind(&prices[..])
651 .bind(&sqrt_price_x96s[..])
652 .bind(&liquidities[..])
653 .bind(&ticks[..])
654 .bind(&amount0s[..])
655 .bind(&amount1s[..])
656 .execute(&self.pool)
657 .await
658 .map(|_| ())
659 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
660 }
661
662 pub async fn add_pool_liquidity_updates_batch(
668 &self,
669 chain_id: u32,
670 updates: &[PoolLiquidityUpdate],
671 ) -> anyhow::Result<()> {
672 if updates.is_empty() {
673 return Ok(());
674 }
675
676 let len = updates.len();
678 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
679 let mut blocks: Vec<i64> = Vec::with_capacity(len);
680 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
681 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
682 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
683 let mut event_types: Vec<String> = Vec::with_capacity(len);
684 let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
685 let mut owners: Vec<String> = Vec::with_capacity(len);
686 let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
687 let mut amount0s: Vec<String> = Vec::with_capacity(len);
688 let mut amount1s: Vec<String> = Vec::with_capacity(len);
689 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
690 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
691 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
692
693 for update in updates {
695 chain_ids.push(chain_id as i32);
696 pool_addresses.push(update.pool_address.to_string());
697 blocks.push(update.block as i64);
698 transaction_hashes.push(update.transaction_hash.clone());
699 transaction_indices.push(update.transaction_index as i32);
700 log_indices.push(update.log_index as i32);
701 event_types.push(update.kind.to_string());
702 senders.push(update.sender.map(|s| s.to_string()));
703 owners.push(update.owner.to_string());
704 position_liquidities.push(update.position_liquidity.to_string());
705 amount0s.push(update.amount0.to_string());
706 amount1s.push(update.amount1.to_string());
707 tick_lowers.push(update.tick_lower);
708 tick_uppers.push(update.tick_upper);
709 }
710
711 sqlx::query(
713 r"
714 INSERT INTO pool_liquidity_event (
715 chain_id, pool_address, block, transaction_hash, transaction_index,
716 log_index, event_type, sender, owner, position_liquidity,
717 amount0, amount1, tick_lower, tick_upper
718 )
719 SELECT
720 chain_id, pool_address, block, transaction_hash, transaction_index,
721 log_index, event_type, sender, owner, position_liquidity::u128,
722 amount0::U256, amount1::U256, tick_lower, tick_upper
723 FROM UNNEST(
724 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
725 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
726 $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
727 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
728 log_index, event_type, sender, owner, position_liquidity,
729 amount0, amount1, tick_lower, tick_upper)
730 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
731 ",
732 )
733 .bind(&chain_ids[..])
734 .bind(&pool_addresses[..])
735 .bind(&blocks[..])
736 .bind(&transaction_hashes[..])
737 .bind(&transaction_indices[..])
738 .bind(&log_indices[..])
739 .bind(&event_types[..])
740 .bind(&senders[..])
741 .bind(&owners[..])
742 .bind(&position_liquidities[..])
743 .bind(&amount0s[..])
744 .bind(&amount1s[..])
745 .bind(&tick_lowers[..])
746 .bind(&tick_uppers[..])
747 .execute(&self.pool)
748 .await
749 .map(|_| ())
750 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
751 }
752
753 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
759 sqlx::query(
760 r"
761 INSERT INTO token (
762 chain_id, address, name, symbol, decimals
763 ) VALUES ($1, $2, $3, $4, $5)
764 ON CONFLICT (chain_id, address)
765 DO UPDATE
766 SET
767 name = $3,
768 symbol = $4,
769 decimals = $5
770 ",
771 )
772 .bind(token.chain.chain_id as i32)
773 .bind(token.address.to_string())
774 .bind(token.name.as_str())
775 .bind(token.symbol.as_str())
776 .bind(i32::from(token.decimals))
777 .execute(&self.pool)
778 .await
779 .map(|_| ())
780 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
781 }
782
783 pub async fn add_invalid_token(
789 &self,
790 chain_id: u32,
791 address: &Address,
792 error_string: &str,
793 ) -> anyhow::Result<()> {
794 sqlx::query(
795 r"
796 INSERT INTO token (
797 chain_id, address, error
798 ) VALUES ($1, $2, $3)
799 ON CONFLICT (chain_id, address)
800 DO NOTHING;
801 ",
802 )
803 .bind(chain_id as i32)
804 .bind(address.to_string())
805 .bind(error_string)
806 .execute(&self.pool)
807 .await
808 .map(|_| ())
809 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
810 }
811
812 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
818 sqlx::query(
819 r"
820 INSERT INTO pool_swap_event (
821 chain_id, pool_address, block, transaction_hash, transaction_index,
822 log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
823 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
824 ON CONFLICT (chain_id, transaction_hash, log_index)
825 DO NOTHING
826 ",
827 )
828 .bind(chain_id as i32)
829 .bind(swap.pool_address.to_string())
830 .bind(swap.block as i64)
831 .bind(swap.transaction_hash.as_str())
832 .bind(swap.transaction_index as i32)
833 .bind(swap.log_index as i32)
834 .bind(swap.sender.to_string())
835 .bind(swap.recipient.to_string())
836 .bind(swap.side.map(|side| side.to_string()))
837 .bind(swap.size.map(|size| size.to_string()))
838 .bind(swap.price.map(|price| price.to_string()))
839 .bind(swap.sqrt_price_x96.to_string())
840 .bind(swap.amount0.to_string())
841 .bind(swap.amount1.to_string())
842 .execute(&self.pool)
843 .await
844 .map(|_| ())
845 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
846 }
847
848 pub async fn add_pool_liquidity_update(
854 &self,
855 chain_id: u32,
856 liquidity_update: &PoolLiquidityUpdate,
857 ) -> anyhow::Result<()> {
858 sqlx::query(
859 r"
860 INSERT INTO pool_liquidity_event (
861 chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
862 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
863 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
864 ON CONFLICT (chain_id, transaction_hash, log_index)
865 DO NOTHING
866 ",
867 )
868 .bind(chain_id as i32)
869 .bind(liquidity_update.pool_address.to_string())
870 .bind(liquidity_update.block as i64)
871 .bind(liquidity_update.transaction_hash.as_str())
872 .bind(liquidity_update.transaction_index as i32)
873 .bind(liquidity_update.log_index as i32)
874 .bind(liquidity_update.kind.to_string())
875 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
876 .bind(liquidity_update.owner.to_string())
877 .bind(U128Pg(liquidity_update.position_liquidity))
878 .bind(U256Pg(liquidity_update.amount0))
879 .bind(U256Pg(liquidity_update.amount1))
880 .bind(liquidity_update.tick_lower)
881 .bind(liquidity_update.tick_upper)
882 .execute(&self.pool)
883 .await
884 .map(|_| ())
885 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
886 }
887
888 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
897 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
898 .bind(chain.chain_id as i32)
899 .fetch_all(&self.pool)
900 .await
901 .map(|rows| {
902 rows.into_iter()
903 .map(|token_row| {
904 Token::new(
905 chain.clone(),
906 token_row.address,
907 token_row.name,
908 token_row.symbol,
909 token_row.decimals as u8,
910 )
911 })
912 .collect::<Vec<_>>()
913 })
914 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
915 }
916
917 pub async fn load_invalid_token_addresses(
923 &self,
924 chain_id: u32,
925 ) -> anyhow::Result<Vec<Address>> {
926 sqlx::query_as::<_, (String,)>(
927 "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
928 )
929 .bind(chain_id as i32)
930 .fetch_all(&self.pool)
931 .await?
932 .into_iter()
933 .map(|(address,)| validate_address(&address))
934 .collect::<Result<Vec<_>, _>>()
935 .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
936 }
937
938 pub async fn load_pools(
944 &self,
945 chain: SharedChain,
946 dex_id: &str,
947 ) -> anyhow::Result<Vec<PoolRow>> {
948 sqlx::query_as::<_, PoolRow>(
949 r"
950 SELECT
951 address,
952 dex_name,
953 creation_block,
954 token0_chain,
955 token0_address,
956 token1_chain,
957 token1_address,
958 fee,
959 tick_spacing,
960 initial_tick,
961 initial_sqrt_price_x96
962 FROM pool
963 WHERE chain_id = $1 AND dex_name = $2
964 ORDER BY creation_block ASC
965 ",
966 )
967 .bind(chain.chain_id as i32)
968 .bind(dex_id)
969 .fetch_all(&self.pool)
970 .await
971 .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
972 }
973
974 pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
988 if enable {
989 tracing::info!("Enabling performance sync settings for bulk operations");
990
991 sqlx::query("SET synchronous_commit = OFF")
993 .execute(&self.pool)
994 .await
995 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
996
997 sqlx::query("SET work_mem = '256MB'")
999 .execute(&self.pool)
1000 .await
1001 .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1002
1003 tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1004 } else {
1005 tracing::info!("Restoring default safe database performance settings");
1006
1007 sqlx::query("SET synchronous_commit = ON")
1009 .execute(&self.pool)
1010 .await
1011 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1012
1013 sqlx::query("RESET work_mem")
1015 .execute(&self.pool)
1016 .await
1017 .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1018 }
1019
1020 Ok(())
1021 }
1022
1023 pub async fn update_dex_last_synced_block(
1029 &self,
1030 chain_id: u32,
1031 dex: &DexType,
1032 block_number: u64,
1033 ) -> anyhow::Result<()> {
1034 sqlx::query(
1035 r"
1036 UPDATE dex
1037 SET last_full_sync_pools_block_number = $3
1038 WHERE chain_id = $1 AND name = $2
1039 ",
1040 )
1041 .bind(chain_id as i32)
1042 .bind(dex.to_string())
1043 .bind(block_number as i64)
1044 .execute(&self.pool)
1045 .await
1046 .map(|_| ())
1047 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1048 }
1049
1050 pub async fn update_pool_last_synced_block(
1051 &self,
1052 chain_id: u32,
1053 dex: &DexType,
1054 pool_address: &Address,
1055 block_number: u64,
1056 ) -> anyhow::Result<()> {
1057 sqlx::query(
1058 r"
1059 UPDATE pool
1060 SET last_full_sync_block_number = $4
1061 WHERE chain_id = $1
1062 AND dex_name = $2
1063 AND address = $3
1064 ",
1065 )
1066 .bind(chain_id as i32)
1067 .bind(dex.to_string())
1068 .bind(pool_address.to_string())
1069 .bind(block_number as i64)
1070 .execute(&self.pool)
1071 .await
1072 .map(|_| ())
1073 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1074 }
1075
1076 pub async fn get_dex_last_synced_block(
1082 &self,
1083 chain_id: u32,
1084 dex: &DexType,
1085 ) -> anyhow::Result<Option<u64>> {
1086 let result = sqlx::query_as::<_, (Option<i64>,)>(
1087 r#"
1088 SELECT
1089 last_full_sync_pools_block_number
1090 FROM dex
1091 WHERE chain_id = $1
1092 AND name = $2
1093 "#,
1094 )
1095 .bind(chain_id as i32)
1096 .bind(dex.to_string())
1097 .fetch_optional(&self.pool)
1098 .await
1099 .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1100
1101 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1102 }
1103
1104 pub async fn get_pool_last_synced_block(
1105 &self,
1106 chain_id: u32,
1107 dex: &DexType,
1108 pool_address: &Address,
1109 ) -> anyhow::Result<Option<u64>> {
1110 let result = sqlx::query_as::<_, (Option<i64>,)>(
1111 r#"
1112 SELECT
1113 last_full_sync_block_number
1114 FROM pool
1115 WHERE chain_id = $1
1116 AND dex_name = $2
1117 AND address = $3
1118 "#,
1119 )
1120 .bind(chain_id as i32)
1121 .bind(dex.to_string())
1122 .bind(pool_address.to_string())
1123 .fetch_optional(&self.pool)
1124 .await
1125 .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1126
1127 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1128 }
1129
1130 pub async fn get_table_last_block(
1137 &self,
1138 chain_id: u32,
1139 table_name: &str,
1140 pool_address: &Address,
1141 ) -> anyhow::Result<Option<u64>> {
1142 let query = format!(
1143 "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1144 table_name
1145 );
1146 let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1147 .bind(chain_id as i32)
1148 .bind(pool_address.to_string())
1149 .fetch_optional(&self.pool)
1150 .await
1151 .map_err(|e| {
1152 anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1153 })?;
1154
1155 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1156 }
1157
1158 pub async fn add_pool_collects_batch(
1164 &self,
1165 chain_id: u32,
1166 collects: &[PoolFeeCollect],
1167 ) -> anyhow::Result<()> {
1168 if collects.is_empty() {
1169 return Ok(());
1170 }
1171
1172 let len = collects.len();
1174 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1175 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1176 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1177 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1178 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1179 let mut owners: Vec<String> = Vec::with_capacity(len);
1180 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1181 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1182 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1183 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1184 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1185
1186 for collect in collects {
1188 chain_ids.push(chain_id as i32);
1189 pool_addresses.push(collect.pool_address.to_string());
1190 blocks.push(collect.block as i64);
1191 transaction_hashes.push(collect.transaction_hash.clone());
1192 transaction_indices.push(collect.transaction_index as i32);
1193 log_indices.push(collect.log_index as i32);
1194 owners.push(collect.owner.to_string());
1195 amount0s.push(collect.amount0.to_string());
1196 amount1s.push(collect.amount1.to_string());
1197 tick_lowers.push(collect.tick_lower);
1198 tick_uppers.push(collect.tick_upper);
1199 }
1200
1201 sqlx::query(
1203 r"
1204 INSERT INTO pool_collect_event (
1205 chain_id, pool_address, block, transaction_hash, transaction_index,
1206 log_index, owner, amount0, amount1, tick_lower, tick_upper
1207 )
1208 SELECT
1209 chain_id, pool_address, block, transaction_hash, transaction_index,
1210 log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1211 FROM UNNEST(
1212 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1213 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1214 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1215 log_index, owner, amount0, amount1, tick_lower, tick_upper)
1216 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1217 ",
1218 )
1219 .bind(&chain_ids[..])
1220 .bind(&pool_addresses[..])
1221 .bind(&blocks[..])
1222 .bind(&transaction_hashes[..])
1223 .bind(&transaction_indices[..])
1224 .bind(&log_indices[..])
1225 .bind(&owners[..])
1226 .bind(&amount0s[..])
1227 .bind(&amount1s[..])
1228 .bind(&tick_lowers[..])
1229 .bind(&tick_uppers[..])
1230 .execute(&self.pool)
1231 .await
1232 .map(|_| ())
1233 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1234 }
1235
1236 pub async fn add_pool_flash_batch(
1242 &self,
1243 chain_id: u32,
1244 flash_events: &[PoolFlash],
1245 ) -> anyhow::Result<()> {
1246 if flash_events.is_empty() {
1247 return Ok(());
1248 }
1249
1250 let len = flash_events.len();
1252 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1253 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1254 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1255 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1256 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1257 let mut senders: Vec<String> = Vec::with_capacity(len);
1258 let mut recipients: Vec<String> = Vec::with_capacity(len);
1259 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1260 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1261 let mut paid0s: Vec<String> = Vec::with_capacity(len);
1262 let mut paid1s: Vec<String> = Vec::with_capacity(len);
1263 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1264
1265 for flash in flash_events {
1267 chain_ids.push(chain_id as i32);
1268 pool_addresses.push(flash.pool_address.to_string());
1269 blocks.push(flash.block as i64);
1270 transaction_hashes.push(flash.transaction_hash.clone());
1271 transaction_indices.push(flash.transaction_index as i32);
1272 log_indices.push(flash.log_index as i32);
1273 senders.push(flash.sender.to_string());
1274 recipients.push(flash.recipient.to_string());
1275 amount0s.push(flash.amount0.to_string());
1276 amount1s.push(flash.amount1.to_string());
1277 paid0s.push(flash.paid0.to_string());
1278 paid1s.push(flash.paid1.to_string());
1279 }
1280
1281 sqlx::query(
1283 r"
1284 INSERT INTO pool_flash_event (
1285 chain_id, pool_address, block, transaction_hash, transaction_index,
1286 log_index, sender, recipient, amount0, amount1, paid0, paid1
1287 )
1288 SELECT
1289 chain_id, pool_address, block, transaction_hash, transaction_index,
1290 log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1291 FROM UNNEST(
1292 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1293 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1294 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1295 log_index, sender, recipient, amount0, amount1, paid0, paid1)
1296 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1297 ",
1298 )
1299 .bind(&chain_ids[..])
1300 .bind(&pool_addresses[..])
1301 .bind(&blocks[..])
1302 .bind(&transaction_hashes[..])
1303 .bind(&transaction_indices[..])
1304 .bind(&log_indices[..])
1305 .bind(&senders[..])
1306 .bind(&recipients[..])
1307 .bind(&amount0s[..])
1308 .bind(&amount1s[..])
1309 .bind(&paid0s[..])
1310 .bind(&paid1s[..])
1311 .execute(&self.pool)
1312 .await
1313 .map(|_| ())
1314 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1315 }
1316
1317 pub async fn add_pool_snapshot(
1318 &self,
1319 chain_id: u32,
1320 pool_address: &Address,
1321 snapshot: &PoolSnapshot,
1322 ) -> anyhow::Result<()> {
1323 sqlx::query(
1324 r"
1325 INSERT INTO pool_snapshot (
1326 chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1327 current_tick, price_sqrt_ratio_x96, liquidity,
1328 protocol_fees_token0, protocol_fees_token1, fee_protocol,
1329 fee_growth_global_0, fee_growth_global_1,
1330 total_amount0_deposited, total_amount1_deposited,
1331 total_amount0_collected, total_amount1_collected,
1332 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1333 liquidity_utilization_rate
1334 ) VALUES (
1335 $1, $2, $3, $4, $5, $6,
1336 $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1337 $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1338 $19, $20, $21, $22, $23, $24
1339 )
1340 ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1341 DO NOTHING
1342 ",
1343 )
1344 .bind(chain_id as i32)
1345 .bind(pool_address.to_string())
1346 .bind(snapshot.block_position.number as i64)
1347 .bind(snapshot.block_position.transaction_index as i32)
1348 .bind(snapshot.block_position.log_index as i32)
1349 .bind(snapshot.block_position.transaction_hash.to_string())
1350 .bind(snapshot.state.current_tick)
1351 .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1352 .bind(snapshot.state.liquidity.to_string())
1353 .bind(snapshot.state.protocol_fees_token0.to_string())
1354 .bind(snapshot.state.protocol_fees_token1.to_string())
1355 .bind(snapshot.state.fee_protocol as i16)
1356 .bind(snapshot.state.fee_growth_global_0.to_string())
1357 .bind(snapshot.state.fee_growth_global_1.to_string())
1358 .bind(snapshot.analytics.total_amount0_deposited.to_string())
1359 .bind(snapshot.analytics.total_amount1_deposited.to_string())
1360 .bind(snapshot.analytics.total_amount0_collected.to_string())
1361 .bind(snapshot.analytics.total_amount1_collected.to_string())
1362 .bind(snapshot.analytics.total_swaps as i32)
1363 .bind(snapshot.analytics.total_mints as i32)
1364 .bind(snapshot.analytics.total_burns as i32)
1365 .bind(snapshot.analytics.total_fee_collects as i32)
1366 .bind(snapshot.analytics.total_flashes as i32)
1367 .bind(snapshot.analytics.liquidity_utilization_rate)
1368 .execute(&self.pool)
1369 .await
1370 .map(|_| ())
1371 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1372 }
1373
1374 pub async fn add_pool_positions_batch(
1380 &self,
1381 chain_id: u32,
1382 snapshot_block: u64,
1383 snapshot_transaction_index: u32,
1384 snapshot_log_index: u32,
1385 positions: &[(Address, PoolPosition)],
1386 ) -> anyhow::Result<()> {
1387 if positions.is_empty() {
1388 return Ok(());
1389 }
1390
1391 let len = positions.len();
1393 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1394 let mut owners: Vec<String> = Vec::with_capacity(len);
1395 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1396 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1397 let mut liquidities: Vec<String> = Vec::with_capacity(len);
1398 let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1399 let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1400 let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1401 let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1402 let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1403 let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1404 let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1405 let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1406
1407 for (pool_address, position) in positions {
1409 pool_addresses.push(pool_address.to_string());
1410 owners.push(position.owner.to_string());
1411 tick_lowers.push(position.tick_lower);
1412 tick_uppers.push(position.tick_upper);
1413 liquidities.push(position.liquidity.to_string());
1414 fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1415 fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1416 tokens_owed_0s.push(position.tokens_owed_0.to_string());
1417 tokens_owed_1s.push(position.tokens_owed_1.to_string());
1418 total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1419 total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1420 total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1421 total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1422 }
1423
1424 sqlx::query(
1426 r"
1427 INSERT INTO pool_position (
1428 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1429 owner, tick_lower, tick_upper,
1430 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1431 tokens_owed_0, tokens_owed_1,
1432 total_amount0_deposited, total_amount1_deposited,
1433 total_amount0_collected, total_amount1_collected
1434 )
1435 SELECT
1436 $1, pool_address, $2, $3, $4,
1437 owner, tick_lower, tick_upper,
1438 liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1439 tokens_owed_0::U128, tokens_owed_1::U128,
1440 total_amount0_deposited::U256, total_amount1_deposited::U256,
1441 total_amount0_collected::U128, total_amount1_collected::U128
1442 FROM UNNEST(
1443 $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1444 $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1445 $16::TEXT[], $17::TEXT[]
1446 ) AS t(pool_address, owner, tick_lower, tick_upper,
1447 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1448 tokens_owed_0, tokens_owed_1,
1449 total_amount0_deposited, total_amount1_deposited,
1450 total_amount0_collected, total_amount1_collected)
1451 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1452 DO NOTHING
1453 ",
1454 )
1455 .bind(chain_id as i32)
1456 .bind(snapshot_block as i64)
1457 .bind(snapshot_transaction_index as i32)
1458 .bind(snapshot_log_index as i32)
1459 .bind(&pool_addresses[..])
1460 .bind(&owners[..])
1461 .bind(&tick_lowers[..])
1462 .bind(&tick_uppers[..])
1463 .bind(&liquidities[..])
1464 .bind(&fee_growth_inside_0_lasts[..])
1465 .bind(&fee_growth_inside_1_lasts[..])
1466 .bind(&tokens_owed_0s[..])
1467 .bind(&tokens_owed_1s[..])
1468 .bind(&total_amount0_depositeds as &[Option<String>])
1469 .bind(&total_amount1_depositeds as &[Option<String>])
1470 .bind(&total_amount0_collecteds as &[Option<String>])
1471 .bind(&total_amount1_collecteds as &[Option<String>])
1472 .execute(&self.pool)
1473 .await
1474 .map(|_| ())
1475 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1476 }
1477
1478 pub async fn add_pool_ticks_batch(
1484 &self,
1485 chain_id: u32,
1486 snapshot_block: u64,
1487 snapshot_transaction_index: u32,
1488 snapshot_log_index: u32,
1489 ticks: &[(Address, &PoolTick)],
1490 ) -> anyhow::Result<()> {
1491 if ticks.is_empty() {
1492 return Ok(());
1493 }
1494
1495 let len = ticks.len();
1497 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1498 let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1499 let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1500 let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1501 let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1502 let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1503 let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1504 let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1505
1506 for (pool_address, tick) in ticks {
1508 pool_addresses.push(pool_address.to_string());
1509 tick_values.push(tick.value);
1510 liquidity_grosses.push(tick.liquidity_gross.to_string());
1511 liquidity_nets.push(tick.liquidity_net.to_string());
1512 fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1513 fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1514 initializeds.push(tick.initialized);
1515 last_updated_blocks.push(tick.last_updated_block as i64);
1516 }
1517
1518 sqlx::query(
1520 r"
1521 INSERT INTO pool_tick (
1522 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1523 tick_value, liquidity_gross, liquidity_net,
1524 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1525 )
1526 SELECT
1527 $1, pool_address, $2, $3, $4,
1528 tick_value, liquidity_gross::U128, liquidity_net::I128,
1529 fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1530 FROM UNNEST(
1531 $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1532 $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1533 ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1534 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1535 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1536 DO NOTHING
1537 ",
1538 )
1539 .bind(chain_id as i32)
1540 .bind(snapshot_block as i64)
1541 .bind(snapshot_transaction_index as i32)
1542 .bind(snapshot_log_index as i32)
1543 .bind(&pool_addresses[..])
1544 .bind(&tick_values[..])
1545 .bind(&liquidity_grosses[..])
1546 .bind(&liquidity_nets[..])
1547 .bind(&fee_growth_outside_0s[..])
1548 .bind(&fee_growth_outside_1s[..])
1549 .bind(&initializeds[..])
1550 .bind(&last_updated_blocks[..])
1551 .execute(&self.pool)
1552 .await
1553 .map(|_| ())
1554 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1555 }
1556
1557 pub async fn update_pool_initial_price_tick(
1558 &self,
1559 chain_id: u32,
1560 initialize_event: &InitializeEvent,
1561 ) -> anyhow::Result<()> {
1562 sqlx::query(
1563 r"
1564 UPDATE pool
1565 SET
1566 initial_tick = $4,
1567 initial_sqrt_price_x96 = $5
1568 WHERE chain_id = $1
1569 AND dex_name = $2
1570 AND address = $3
1571 ",
1572 )
1573 .bind(chain_id as i32)
1574 .bind(initialize_event.dex.name.to_string())
1575 .bind(initialize_event.pool_address.to_string())
1576 .bind(initialize_event.tick)
1577 .bind(initialize_event.sqrt_price_x96.to_string())
1578 .execute(&self.pool)
1579 .await
1580 .map(|_| ())
1581 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1582 }
1583
1584 pub async fn load_latest_valid_pool_snapshot(
1592 &self,
1593 chain_id: u32,
1594 pool_address: &Address,
1595 ) -> anyhow::Result<Option<PoolSnapshot>> {
1596 let result = sqlx::query(
1597 r"
1598 SELECT
1599 block, transaction_index, log_index, transaction_hash,
1600 current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1601 protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1602 fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1603 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1604 total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1605 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1606 liquidity_utilization_rate,
1607 (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1608 FROM pool_snapshot
1609 WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1610 ORDER BY block DESC, transaction_index DESC, log_index DESC
1611 LIMIT 1
1612 ",
1613 )
1614 .bind(chain_id as i32)
1615 .bind(pool_address.to_string())
1616 .fetch_optional(&self.pool)
1617 .await
1618 .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1619
1620 if let Some(row) = result {
1621 let block: i64 = row.get("block");
1623 let transaction_index: i32 = row.get("transaction_index");
1624 let log_index: i32 = row.get("log_index");
1625 let transaction_hash: String = row.get("transaction_hash");
1626
1627 let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1628 block as u64,
1629 transaction_hash,
1630 transaction_index as u32,
1631 log_index as u32,
1632 );
1633
1634 let state = PoolState {
1635 current_tick: row.get("current_tick"),
1636 price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1637 liquidity: row.get::<String, _>("liquidity").parse()?,
1638 protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1639 protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1640 fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1641 fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1642 fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1643 };
1644
1645 let analytics = PoolAnalytics {
1646 total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1647 total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1648 total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1649 total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1650 total_swaps: row.get::<i32, _>("total_swaps") as u64,
1651 total_mints: row.get::<i32, _>("total_mints") as u64,
1652 total_burns: row.get::<i32, _>("total_burns") as u64,
1653 total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1654 total_flashes: row.get::<i32, _>("total_flashes") as u64,
1655 liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1656 };
1657
1658 let positions = self
1660 .load_pool_positions_for_snapshot(
1661 chain_id,
1662 pool_address,
1663 block as u64,
1664 transaction_index as u32,
1665 log_index as u32,
1666 )
1667 .await?;
1668
1669 let ticks = self
1670 .load_pool_ticks_for_snapshot(
1671 chain_id,
1672 pool_address,
1673 block as u64,
1674 transaction_index as u32,
1675 log_index as u32,
1676 )
1677 .await?;
1678
1679 let dex_name: String = row.get("dex_name");
1680 let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1681 .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1682
1683 let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1684 .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1685
1686 let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1687 .ok_or_else(|| {
1688 anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1689 })?;
1690
1691 let instrument_id =
1692 Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1693
1694 Ok(Some(PoolSnapshot::new(
1695 instrument_id,
1696 state,
1697 positions,
1698 ticks,
1699 analytics,
1700 block_position,
1701 )))
1702 } else {
1703 Ok(None)
1704 }
1705 }
1706
1707 pub async fn mark_pool_snapshot_valid(
1713 &self,
1714 chain_id: u32,
1715 pool_address: &Address,
1716 block: u64,
1717 transaction_index: u32,
1718 log_index: u32,
1719 ) -> anyhow::Result<()> {
1720 sqlx::query(
1721 r"
1722 UPDATE pool_snapshot
1723 SET is_valid = TRUE
1724 WHERE chain_id = $1
1725 AND pool_address = $2
1726 AND block = $3
1727 AND transaction_index = $4
1728 AND log_index = $5
1729 ",
1730 )
1731 .bind(chain_id as i32)
1732 .bind(pool_address.to_string())
1733 .bind(block as i64)
1734 .bind(transaction_index as i32)
1735 .bind(log_index as i32)
1736 .execute(&self.pool)
1737 .await
1738 .map(|_| ())
1739 .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1740 }
1741
1742 pub async fn load_pool_positions_for_snapshot(
1748 &self,
1749 chain_id: u32,
1750 pool_address: &Address,
1751 snapshot_block: u64,
1752 snapshot_transaction_index: u32,
1753 snapshot_log_index: u32,
1754 ) -> anyhow::Result<Vec<PoolPosition>> {
1755 let rows = sqlx::query(
1756 r"
1757 SELECT
1758 owner, tick_lower, tick_upper,
1759 liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1760 tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1761 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1762 total_amount0_collected::TEXT, total_amount1_collected::TEXT
1763 FROM pool_position
1764 WHERE chain_id = $1
1765 AND pool_address = $2
1766 AND snapshot_block = $3
1767 AND snapshot_transaction_index = $4
1768 AND snapshot_log_index = $5
1769 ",
1770 )
1771 .bind(chain_id as i32)
1772 .bind(pool_address.to_string())
1773 .bind(snapshot_block as i64)
1774 .bind(snapshot_transaction_index as i32)
1775 .bind(snapshot_log_index as i32)
1776 .fetch_all(&self.pool)
1777 .await
1778 .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1779
1780 rows.iter()
1781 .map(|row| {
1782 let owner: String = row.get("owner");
1783 let position = PoolPosition {
1784 owner: validate_address(&owner)?,
1785 tick_lower: row.get("tick_lower"),
1786 tick_upper: row.get("tick_upper"),
1787 liquidity: row.get::<String, _>("liquidity").parse()?,
1788 fee_growth_inside_0_last: row
1789 .get::<String, _>("fee_growth_inside_0_last")
1790 .parse()?,
1791 fee_growth_inside_1_last: row
1792 .get::<String, _>("fee_growth_inside_1_last")
1793 .parse()?,
1794 tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1795 tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1796 total_amount0_deposited: row
1797 .get::<String, _>("total_amount0_deposited")
1798 .parse()?,
1799 total_amount1_deposited: row
1800 .get::<String, _>("total_amount1_deposited")
1801 .parse()?,
1802 total_amount0_collected: row
1803 .get::<String, _>("total_amount0_collected")
1804 .parse()?,
1805 total_amount1_collected: row
1806 .get::<String, _>("total_amount1_collected")
1807 .parse()?,
1808 };
1809 Ok(position)
1810 })
1811 .collect()
1812 }
1813
1814 pub async fn load_pool_ticks_for_snapshot(
1820 &self,
1821 chain_id: u32,
1822 pool_address: &Address,
1823 snapshot_block: u64,
1824 snapshot_transaction_index: u32,
1825 snapshot_log_index: u32,
1826 ) -> anyhow::Result<Vec<PoolTick>> {
1827 let rows = sqlx::query(
1828 r"
1829 SELECT
1830 tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1831 fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1832 last_updated_block
1833 FROM pool_tick
1834 WHERE chain_id = $1
1835 AND pool_address = $2
1836 AND snapshot_block = $3
1837 AND snapshot_transaction_index = $4
1838 AND snapshot_log_index = $5
1839 ",
1840 )
1841 .bind(chain_id as i32)
1842 .bind(pool_address.to_string())
1843 .bind(snapshot_block as i64)
1844 .bind(snapshot_transaction_index as i32)
1845 .bind(snapshot_log_index as i32)
1846 .fetch_all(&self.pool)
1847 .await
1848 .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1849
1850 rows.iter()
1851 .map(|row| {
1852 let tick = PoolTick::new(
1853 row.get("tick_value"),
1854 row.get::<String, _>("liquidity_gross").parse()?,
1855 row.get::<String, _>("liquidity_net").parse()?,
1856 row.get::<String, _>("fee_growth_outside_0").parse()?,
1857 row.get::<String, _>("fee_growth_outside_1").parse()?,
1858 row.get("initialized"),
1859 row.get::<i64, _>("last_updated_block") as u64,
1860 );
1861 Ok(tick)
1862 })
1863 .collect()
1864 }
1865
1866 pub fn stream_pool_events<'a>(
1879 &'a self,
1880 chain: SharedChain,
1881 dex: SharedDex,
1882 instrument_id: InstrumentId,
1883 pool_address: &Address,
1884 from_position: Option<BlockPosition>,
1885 ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1886 const QUERY_ALL: &str = r"
1888 (SELECT
1889 'swap' as event_type,
1890 chain_id,
1891 pool_address,
1892 block,
1893 transaction_hash,
1894 transaction_index,
1895 log_index,
1896 sender,
1897 recipient,
1898 NULL::TEXT as owner,
1899 side,
1900 size,
1901 price,
1902 sqrt_price_x96::TEXT,
1903 liquidity::TEXT as swap_liquidity,
1904 tick as swap_tick,
1905 amount0::TEXT as swap_amount0,
1906 amount1::TEXT as swap_amount1,
1907 NULL::TEXT as position_liquidity,
1908 NULL::TEXT as amount0,
1909 NULL::TEXT as amount1,
1910 NULL::INT as tick_lower,
1911 NULL::INT as tick_upper,
1912 NULL::TEXT as liquidity_event_type,
1913 NULL::TEXT as flash_amount0,
1914 NULL::TEXT as flash_amount1,
1915 NULL::TEXT as flash_paid0,
1916 NULL::TEXT as flash_paid1
1917 FROM pool_swap_event
1918 WHERE chain_id = $1 AND pool_address = $2)
1919 UNION ALL
1920 (SELECT
1921 'liquidity' as event_type,
1922 chain_id,
1923 pool_address,
1924 block,
1925 transaction_hash,
1926 transaction_index,
1927 log_index,
1928 sender,
1929 NULL::TEXT as recipient,
1930 owner,
1931 NULL::TEXT as side,
1932 NULL::TEXT as size,
1933 NULL::text as price,
1934 NULL::text as sqrt_price_x96,
1935 NULL::TEXT as swap_liquidity,
1936 NULL::INT as swap_tick,
1937 amount0::TEXT as swap_amount0,
1938 amount1::TEXT as swap_amount1,
1939 position_liquidity::TEXT,
1940 amount0::TEXT,
1941 amount1::TEXT,
1942 tick_lower::INT,
1943 tick_upper::INT,
1944 event_type as liquidity_event_type,
1945 NULL::TEXT as flash_amount0,
1946 NULL::TEXT as flash_amount1,
1947 NULL::TEXT as flash_paid0,
1948 NULL::TEXT as flash_paid1
1949 FROM pool_liquidity_event
1950 WHERE chain_id = $1 AND pool_address = $2)
1951 UNION ALL
1952 (SELECT
1953 'collect' as event_type,
1954 chain_id,
1955 pool_address,
1956 block,
1957 transaction_hash,
1958 transaction_index,
1959 log_index,
1960 NULL::TEXT as sender,
1961 NULL::TEXT as recipient,
1962 owner,
1963 NULL::TEXT as side,
1964 NULL::TEXT as size,
1965 NULL::TEXT as price,
1966 NULL::TEXT as sqrt_price_x96,
1967 NULL::TEXT as swap_liquidity,
1968 NULL::INT AS swap_tick,
1969 amount0::TEXT as swap_amount0,
1970 amount1::TEXT as swap_amount1,
1971 NULL::TEXT as position_liquidity,
1972 amount0::TEXT,
1973 amount1::TEXT,
1974 tick_lower::INT,
1975 tick_upper::INT,
1976 NULL::TEXT as liquidity_event_type,
1977 NULL::TEXT as flash_amount0,
1978 NULL::TEXT as flash_amount1,
1979 NULL::TEXT as flash_paid0,
1980 NULL::TEXT as flash_paid1
1981 FROM pool_collect_event
1982 WHERE chain_id = $1 AND pool_address = $2)
1983 UNION ALL
1984 (SELECT
1985 'flash' as event_type,
1986 chain_id,
1987 pool_address,
1988 block,
1989 transaction_hash,
1990 transaction_index,
1991 log_index,
1992 sender,
1993 recipient,
1994 NULL::TEXT as owner,
1995 NULL::TEXT as side,
1996 NULL::TEXT as size,
1997 NULL::TEXT as price,
1998 NULL::TEXT as sqrt_price_x96,
1999 NULL::TEXT as swap_liquidity,
2000 NULL::INT AS swap_tick,
2001 NULL::TEXT as swap_amount0,
2002 NULL::TEXT as swap_amount1,
2003 NULL::TEXT as position_liquidity,
2004 NULL::TEXT as amount0,
2005 NULL::TEXT as amount1,
2006 NULL::INT as tick_lower,
2007 NULL::INT as tick_upper,
2008 NULL::TEXT as liquidity_event_type,
2009 amount0::TEXT as flash_amount0,
2010 amount1::TEXT as flash_amount1,
2011 paid0::TEXT as flash_paid0,
2012 paid1::TEXT as flash_paid1
2013 FROM pool_flash_event
2014 WHERE chain_id = $1 AND pool_address = $2)
2015 ORDER BY block, transaction_index, log_index";
2016
2017 const QUERY_FROM_POSITION: &str = r"
2019 (SELECT
2020 'swap' as event_type,
2021 chain_id,
2022 pool_address,
2023 block,
2024 transaction_hash,
2025 transaction_index,
2026 log_index,
2027 sender,
2028 recipient,
2029 NULL::TEXT as owner,
2030 side,
2031 size,
2032 price,
2033 sqrt_price_x96::TEXT,
2034 liquidity::TEXT as swap_liquidity,
2035 tick as swap_tick,
2036 amount0::TEXT as swap_amount0,
2037 amount1::TEXT as swap_amount1,
2038 NULL::TEXT as position_liquidity,
2039 NULL::TEXT as amount0,
2040 NULL::TEXT as amount1,
2041 NULL::INT as tick_lower,
2042 NULL::INT as tick_upper,
2043 NULL::TEXT as liquidity_event_type,
2044 NULL::TEXT as flash_amount0,
2045 NULL::TEXT as flash_amount1,
2046 NULL::TEXT as flash_paid0,
2047 NULL::TEXT as flash_paid1
2048 FROM pool_swap_event
2049 WHERE chain_id = $1 AND pool_address = $2
2050 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2051 UNION ALL
2052 (SELECT
2053 'liquidity' as event_type,
2054 chain_id,
2055 pool_address,
2056 block,
2057 transaction_hash,
2058 transaction_index,
2059 log_index,
2060 sender,
2061 NULL::TEXT as recipient,
2062 owner,
2063 NULL::TEXT as side,
2064 NULL::TEXT as size,
2065 NULL::text as price,
2066 NULL::text as sqrt_price_x96,
2067 NULL::TEXT as swap_liquidity,
2068 NULL::INT as swap_tick,
2069 amount0::TEXT as swap_amount0,
2070 amount1::TEXT as swap_amount1,
2071 position_liquidity::TEXT,
2072 amount0::TEXT,
2073 amount1::TEXT,
2074 tick_lower::INT,
2075 tick_upper::INT,
2076 event_type as liquidity_event_type,
2077 NULL::TEXT as flash_amount0,
2078 NULL::TEXT as flash_amount1,
2079 NULL::TEXT as flash_paid0,
2080 NULL::TEXT as flash_paid1
2081 FROM pool_liquidity_event
2082 WHERE chain_id = $1 AND pool_address = $2
2083 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2084 UNION ALL
2085 (SELECT
2086 'collect' as event_type,
2087 chain_id,
2088 pool_address,
2089 block,
2090 transaction_hash,
2091 transaction_index,
2092 log_index,
2093 NULL::TEXT as sender,
2094 NULL::TEXT as recipient,
2095 owner,
2096 NULL::TEXT as side,
2097 NULL::TEXT as size,
2098 NULL::TEXT as price,
2099 NULL::TEXT as sqrt_price_x96,
2100 NULL::TEXT as swap_liquidity,
2101 NULL::INT AS swap_tick,
2102 amount0::TEXT as swap_amount0,
2103 amount1::TEXT as swap_amount1,
2104 NULL::TEXT as position_liquidity,
2105 amount0::TEXT,
2106 amount1::TEXT,
2107 tick_lower::INT,
2108 tick_upper::INT,
2109 NULL::TEXT as liquidity_event_type,
2110 NULL::TEXT as flash_amount0,
2111 NULL::TEXT as flash_amount1,
2112 NULL::TEXT as flash_paid0,
2113 NULL::TEXT as flash_paid1
2114 FROM pool_collect_event
2115 WHERE chain_id = $1 AND pool_address = $2
2116 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2117 UNION ALL
2118 (SELECT
2119 'flash' as event_type,
2120 chain_id,
2121 pool_address,
2122 block,
2123 transaction_hash,
2124 transaction_index,
2125 log_index,
2126 sender,
2127 recipient,
2128 NULL::TEXT as owner,
2129 NULL::TEXT as side,
2130 NULL::TEXT as size,
2131 NULL::TEXT as price,
2132 NULL::TEXT as sqrt_price_x96,
2133 NULL::TEXT as swap_liquidity,
2134 NULL::INT AS swap_tick,
2135 NULL::TEXT as swap_amount0,
2136 NULL::TEXT as swap_amount1,
2137 NULL::TEXT as position_liquidity,
2138 NULL::TEXT as amount0,
2139 NULL::TEXT as amount1,
2140 NULL::INT as tick_lower,
2141 NULL::INT as tick_upper,
2142 NULL::TEXT as liquidity_event_type,
2143 amount0::TEXT as flash_amount0,
2144 amount1::TEXT as flash_amount1,
2145 paid0::TEXT as flash_paid0,
2146 paid1::TEXT as flash_paid1
2147 FROM pool_flash_event
2148 WHERE chain_id = $1 AND pool_address = $2
2149 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2150 ORDER BY block, transaction_index, log_index";
2151
2152 let query = if let Some(pos) = from_position {
2154 sqlx::query(QUERY_FROM_POSITION)
2155 .bind(chain.chain_id as i32)
2156 .bind(pool_address.to_string())
2157 .bind(pos.number as i64)
2158 .bind(pos.transaction_index as i32)
2159 .bind(pos.log_index as i32)
2160 .fetch(&self.pool)
2161 } else {
2162 sqlx::query(QUERY_ALL)
2163 .bind(chain.chain_id as i32)
2164 .bind(pool_address.to_string())
2165 .fetch(&self.pool)
2166 };
2167
2168 let stream = query.map(move |row_result| match row_result {
2170 Ok(row) => {
2171 transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2172 .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2173 }
2174 Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2175 });
2176
2177 Box::pin(stream)
2178 }
2179}