1use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21 defi::{
22 Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23 data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24 pool_analysis::{
25 position::PoolPosition,
26 snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27 },
28 tick_map::tick::PoolTick,
29 validation::validate_address,
30 },
31 identifiers::InstrumentId,
32};
33use sqlx::{PgPool, Row, postgres::PgConnectOptions};
34
35use crate::{
36 cache::{
37 consistency::CachedBlocksConsistencyStatus,
38 copy::PostgresCopyHandler,
39 rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
40 types::{U128Pg, U256Pg},
41 },
42 events::initialize::InitializeEvent,
43};
44
45#[derive(Debug)]
47pub struct BlockchainCacheDatabase {
48 pool: PgPool,
50}
51
52impl BlockchainCacheDatabase {
53 pub async fn init(pg_options: PgConnectOptions) -> Self {
59 let pool = sqlx::postgres::PgPoolOptions::new()
60 .max_connections(32) .min_connections(5) .acquire_timeout(std::time::Duration::from_secs(3))
63 .connect_with(pg_options)
64 .await
65 .expect("Error connecting to Postgres");
66 Self { pool }
67 }
68
69 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
75 sqlx::query(
76 r"
77 INSERT INTO chain (
78 chain_id, name
79 ) VALUES ($1,$2)
80 ON CONFLICT (chain_id)
81 DO NOTHING
82 ",
83 )
84 .bind(chain.chain_id as i32)
85 .bind(chain.name.to_string())
86 .execute(&self.pool)
87 .await
88 .map(|_| ())
89 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
90 }
91
92 pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
99 let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
100 .bind(chain.chain_id as i32)
101 .fetch_one(&self.pool)
102 .await
103 .map_err(|e| {
104 anyhow::anyhow!(
105 "Failed to call create_block_partition for chain {}: {e}",
106 chain.chain_id
107 )
108 })?;
109
110 Ok(result.0)
111 }
112
113 pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
120 let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
121 .bind(chain.chain_id as i32)
122 .fetch_one(&self.pool)
123 .await
124 .map_err(|e| {
125 anyhow::anyhow!(
126 "Failed to call create_token_partition for chain {}: {e}",
127 chain.chain_id
128 )
129 })?;
130
131 Ok(result.0)
132 }
133
134 pub async fn get_block_consistency_status(
140 &self,
141 chain: &Chain,
142 ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
143 tracing::info!("Fetching block consistency status");
144
145 let result: (i64, i64) = sqlx::query_as(
146 r"
147 SELECT
148 COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
149 get_last_continuous_block($1) as last_continuous_block
150 "
151 )
152 .bind(chain.chain_id as i32)
153 .fetch_one(&self.pool)
154 .await
155 .map_err(|e| {
156 anyhow::anyhow!(
157 "Failed to get block info for chain {}: {}",
158 chain.chain_id,
159 e
160 )
161 })?;
162
163 Ok(CachedBlocksConsistencyStatus::new(
164 result.0 as u64,
165 result.1 as u64,
166 ))
167 }
168
169 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
175 sqlx::query(
176 r"
177 INSERT INTO block (
178 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
179 base_fee_per_gas, blob_gas_used, excess_blob_gas,
180 l1_gas_price, l1_gas_used, l1_fee_scalar
181 ) VALUES (
182 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
183 )
184 ON CONFLICT (chain_id, number)
185 DO UPDATE
186 SET
187 hash = $3,
188 parent_hash = $4,
189 miner = $5,
190 gas_limit = $6,
191 gas_used = $7,
192 timestamp = $8,
193 base_fee_per_gas = $9,
194 blob_gas_used = $10,
195 excess_blob_gas = $11,
196 l1_gas_price = $12,
197 l1_gas_used = $13,
198 l1_fee_scalar = $14
199 ",
200 )
201 .bind(chain_id as i32)
202 .bind(block.number as i64)
203 .bind(block.hash.as_str())
204 .bind(block.parent_hash.as_str())
205 .bind(block.miner.as_str())
206 .bind(block.gas_limit as i64)
207 .bind(block.gas_used as i64)
208 .bind(block.timestamp.to_string())
209 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
210 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
211 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
212 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
213 .bind(block.l1_gas_used.map(|v| v as i64))
214 .bind(block.l1_fee_scalar.map(|v| v as i64))
215 .execute(&self.pool)
216 .await
217 .map(|_| ())
218 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
219 }
220
221 pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
227 if blocks.is_empty() {
228 return Ok(());
229 }
230
231 let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
233 let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
234 let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
235 let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
236 let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
237 let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
238 let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
239 let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
240 let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241 let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242 let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243 let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
244 let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245
246 for block in blocks {
248 numbers.push(block.number as i64);
249 hashes.push(block.hash.clone());
250 parent_hashes.push(block.parent_hash.clone());
251 miners.push(block.miner.to_string());
252 gas_limits.push(block.gas_limit as i64);
253 gas_useds.push(block.gas_used as i64);
254 timestamps.push(block.timestamp.to_string());
255 base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
256 blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
257 excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
258 l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
259 l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
260 l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
261 }
262
263 sqlx::query(
265 r"
266 INSERT INTO block (
267 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
268 base_fee_per_gas, blob_gas_used, excess_blob_gas,
269 l1_gas_price, l1_gas_used, l1_fee_scalar
270 )
271 SELECT
272 $1, *
273 FROM UNNEST(
274 $2::int8[], $3::text[], $4::text[], $5::text[],
275 $6::int8[], $7::int8[], $8::text[],
276 $9::text[], $10::text[], $11::text[],
277 $12::text[], $13::int8[], $14::int8[]
278 )
279 ON CONFLICT (chain_id, number) DO NOTHING
280 ",
281 )
282 .bind(chain_id as i32)
283 .bind(&numbers[..])
284 .bind(&hashes[..])
285 .bind(&parent_hashes[..])
286 .bind(&miners[..])
287 .bind(&gas_limits[..])
288 .bind(&gas_useds[..])
289 .bind(×tamps[..])
290 .bind(&base_fee_per_gases as &[Option<String>])
291 .bind(&blob_gas_useds as &[Option<String>])
292 .bind(&excess_blob_gases as &[Option<String>])
293 .bind(&l1_gas_prices as &[Option<String>])
294 .bind(&l1_gas_useds as &[Option<i64>])
295 .bind(&l1_fee_scalars as &[Option<i64>])
296 .execute(&self.pool)
297 .await
298 .map(|_| ())
299 .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
300 }
301
302 pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
311 let copy_handler = PostgresCopyHandler::new(&self.pool);
312 copy_handler.copy_blocks(chain_id, blocks).await
313 }
314
315 pub async fn add_pool_swaps_copy(
324 &self,
325 chain_id: u32,
326 swaps: &[PoolSwap],
327 ) -> anyhow::Result<()> {
328 let copy_handler = PostgresCopyHandler::new(&self.pool);
329 copy_handler.copy_pool_swaps(chain_id, swaps).await
330 }
331
332 pub async fn add_pool_liquidity_updates_copy(
341 &self,
342 chain_id: u32,
343 updates: &[PoolLiquidityUpdate],
344 ) -> anyhow::Result<()> {
345 let copy_handler = PostgresCopyHandler::new(&self.pool);
346 copy_handler
347 .copy_pool_liquidity_updates(chain_id, updates)
348 .await
349 }
350
351 pub async fn copy_pool_fee_collects_batch(
360 &self,
361 chain_id: u32,
362 collects: &[PoolFeeCollect],
363 ) -> anyhow::Result<()> {
364 let copy_handler = PostgresCopyHandler::new(&self.pool);
365 copy_handler.copy_pool_collects(chain_id, collects).await
366 }
367
368 pub async fn load_block_timestamps(
374 &self,
375 chain: SharedChain,
376 from_block: u64,
377 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
378 sqlx::query_as::<_, BlockTimestampRow>(
379 r"
380 SELECT
381 number,
382 timestamp
383 FROM block
384 WHERE chain_id = $1 AND number >= $2
385 ORDER BY number ASC
386 ",
387 )
388 .bind(chain.chain_id as i32)
389 .bind(from_block as i64)
390 .fetch_all(&self.pool)
391 .await
392 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
393 }
394
395 pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
401 sqlx::query(
402 r"
403 INSERT INTO dex (
404 chain_id, name, factory_address, creation_block
405 ) VALUES ($1, $2, $3, $4)
406 ON CONFLICT (chain_id, name)
407 DO UPDATE
408 SET
409 factory_address = $3,
410 creation_block = $4
411 ",
412 )
413 .bind(dex.chain.chain_id as i32)
414 .bind(dex.name.to_string())
415 .bind(dex.factory.to_string())
416 .bind(dex.factory_creation_block as i64)
417 .execute(&self.pool)
418 .await
419 .map(|_| ())
420 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
421 }
422
423 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
429 sqlx::query(
430 r"
431 INSERT INTO pool (
432 chain_id, address, dex_name, creation_block,
433 token0_chain, token0_address,
434 token1_chain, token1_address,
435 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
436 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
437 ON CONFLICT (chain_id, address)
438 DO UPDATE
439 SET
440 dex_name = $3,
441 creation_block = $4,
442 token0_chain = $5,
443 token0_address = $6,
444 token1_chain = $7,
445 token1_address = $8,
446 fee = $9,
447 tick_spacing = $10,
448 initial_tick = $11,
449 initial_sqrt_price_x96 = $12
450 ",
451 )
452 .bind(pool.chain.chain_id as i32)
453 .bind(pool.address.to_string())
454 .bind(pool.dex.name.to_string())
455 .bind(pool.creation_block as i64)
456 .bind(pool.token0.chain.chain_id as i32)
457 .bind(pool.token0.address.to_string())
458 .bind(pool.token1.chain.chain_id as i32)
459 .bind(pool.token1.address.to_string())
460 .bind(pool.fee.map(|fee| fee as i32))
461 .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
462 .bind(pool.initial_tick)
463 .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
464 .execute(&self.pool)
465 .await
466 .map(|_| ())
467 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
468 }
469
470 pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
476 if pools.is_empty() {
477 return Ok(());
478 }
479
480 let len = pools.len();
482 let mut addresses: Vec<String> = Vec::with_capacity(len);
483 let mut dex_names: Vec<String> = Vec::with_capacity(len);
484 let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
485 let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
486 let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
487 let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
488 let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
489 let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
490 let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
491 let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
492 let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
493 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
494
495 for pool in pools {
497 chain_ids.push(pool.chain.chain_id as i32);
498 addresses.push(pool.address.to_string());
499 dex_names.push(pool.dex.name.to_string());
500 creation_blocks.push(pool.creation_block as i64);
501 token0_chains.push(pool.token0.chain.chain_id as i32);
502 token0_addresses.push(pool.token0.address.to_string());
503 token1_chains.push(pool.token1.chain.chain_id as i32);
504 token1_addresses.push(pool.token1.address.to_string());
505 fees.push(pool.fee.map(|fee| fee as i32));
506 tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
507 initial_ticks.push(pool.initial_tick);
508 initial_sqrt_price_x96s
509 .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
510 }
511
512 sqlx::query(
514 r"
515 INSERT INTO pool (
516 chain_id, address, dex_name, creation_block,
517 token0_chain, token0_address,
518 token1_chain, token1_address,
519 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
520 )
521 SELECT *
522 FROM UNNEST(
523 $1::int4[], $2::text[], $3::text[], $4::int8[],
524 $5::int4[], $6::text[], $7::int4[], $8::text[],
525 $9::int4[], $10::int4[], $11::int4[], $12::text[]
526 )
527 ON CONFLICT (chain_id, address) DO NOTHING
528 ",
529 )
530 .bind(&chain_ids[..])
531 .bind(&addresses[..])
532 .bind(&dex_names[..])
533 .bind(&creation_blocks[..])
534 .bind(&token0_chains[..])
535 .bind(&token0_addresses[..])
536 .bind(&token1_chains[..])
537 .bind(&token1_addresses[..])
538 .bind(&fees[..])
539 .bind(&tick_spacings[..])
540 .bind(&initial_ticks[..])
541 .bind(&initial_sqrt_price_x96s[..])
542 .execute(&self.pool)
543 .await
544 .map(|_| ())
545 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
546 }
547
548 pub async fn add_pool_swaps_batch(
554 &self,
555 chain_id: u32,
556 swaps: &[PoolSwap],
557 ) -> anyhow::Result<()> {
558 if swaps.is_empty() {
559 return Ok(());
560 }
561
562 let len = swaps.len();
564 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
565 let mut blocks: Vec<i64> = Vec::with_capacity(len);
566 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
567 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
568 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
569 let mut senders: Vec<String> = Vec::with_capacity(len);
570 let mut recipients: Vec<String> = Vec::with_capacity(len);
571 let mut sides: Vec<Option<String>> = Vec::with_capacity(len);
572 let mut sizes: Vec<Option<String>> = Vec::with_capacity(len);
573 let mut prices: Vec<Option<String>> = Vec::with_capacity(len);
574 let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
575 let mut liquidities: Vec<String> = Vec::with_capacity(len);
576 let mut ticks: Vec<i32> = Vec::with_capacity(len);
577 let mut amount0s: Vec<String> = Vec::with_capacity(len);
578 let mut amount1s: Vec<String> = Vec::with_capacity(len);
579 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
580
581 for swap in swaps {
583 chain_ids.push(chain_id as i32);
584 pool_addresses.push(swap.pool_address.to_string());
585 blocks.push(swap.block as i64);
586 transaction_hashes.push(swap.transaction_hash.clone());
587 transaction_indices.push(swap.transaction_index as i32);
588 log_indices.push(swap.log_index as i32);
589 senders.push(swap.sender.to_string());
590 recipients.push(swap.recipient.to_string());
591 sides.push(swap.side.map(|side| side.to_string()));
592 sizes.push(swap.size.map(|size| size.to_string()));
593 prices.push(swap.price.map(|price| price.to_string()));
594 sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
595 liquidities.push(swap.liquidity.to_string());
596 ticks.push(swap.tick);
597 amount0s.push(swap.amount0.to_string());
598 amount1s.push(swap.amount1.to_string());
599 }
600
601 sqlx::query(
603 r"
604 INSERT INTO pool_swap_event (
605 chain_id, pool_address, block, transaction_hash, transaction_index,
606 log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1
607 )
608 SELECT
609 chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
610 side, size, price, sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256
611 FROM UNNEST(
612 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[], $6::INT[],
613 $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
614 $12::TEXT[], $13::TEXT[], $14::INT[], $15::TEXT[], $16::TEXT[]
615 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
616 log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1)
617 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
618 ",
619 )
620 .bind(&chain_ids[..])
621 .bind(&pool_addresses[..])
622 .bind(&blocks[..])
623 .bind(&transaction_hashes[..])
624 .bind(&transaction_indices[..])
625 .bind(&log_indices[..])
626 .bind(&senders[..])
627 .bind(&recipients[..])
628 .bind(&sides[..])
629 .bind(&sizes[..])
630 .bind(&prices[..])
631 .bind(&sqrt_price_x96s[..])
632 .bind(&liquidities[..])
633 .bind(&ticks[..])
634 .bind(&amount0s[..])
635 .bind(&amount1s[..])
636 .execute(&self.pool)
637 .await
638 .map(|_| ())
639 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
640 }
641
642 pub async fn add_pool_liquidity_updates_batch(
648 &self,
649 chain_id: u32,
650 updates: &[PoolLiquidityUpdate],
651 ) -> anyhow::Result<()> {
652 if updates.is_empty() {
653 return Ok(());
654 }
655
656 let len = updates.len();
658 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
659 let mut blocks: Vec<i64> = Vec::with_capacity(len);
660 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
661 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
662 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
663 let mut event_types: Vec<String> = Vec::with_capacity(len);
664 let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
665 let mut owners: Vec<String> = Vec::with_capacity(len);
666 let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
667 let mut amount0s: Vec<String> = Vec::with_capacity(len);
668 let mut amount1s: Vec<String> = Vec::with_capacity(len);
669 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
670 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
671 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
672
673 for update in updates {
675 chain_ids.push(chain_id as i32);
676 pool_addresses.push(update.pool_address.to_string());
677 blocks.push(update.block as i64);
678 transaction_hashes.push(update.transaction_hash.clone());
679 transaction_indices.push(update.transaction_index as i32);
680 log_indices.push(update.log_index as i32);
681 event_types.push(update.kind.to_string());
682 senders.push(update.sender.map(|s| s.to_string()));
683 owners.push(update.owner.to_string());
684 position_liquidities.push(update.position_liquidity.to_string());
685 amount0s.push(update.amount0.to_string());
686 amount1s.push(update.amount1.to_string());
687 tick_lowers.push(update.tick_lower);
688 tick_uppers.push(update.tick_upper);
689 }
690
691 sqlx::query(
693 r"
694 INSERT INTO pool_liquidity_event (
695 chain_id, pool_address, block, transaction_hash, transaction_index,
696 log_index, event_type, sender, owner, position_liquidity,
697 amount0, amount1, tick_lower, tick_upper
698 )
699 SELECT
700 chain_id, pool_address, block, transaction_hash, transaction_index,
701 log_index, event_type, sender, owner, position_liquidity::u128,
702 amount0::U256, amount1::U256, tick_lower, tick_upper
703 FROM UNNEST(
704 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
705 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
706 $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
707 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
708 log_index, event_type, sender, owner, position_liquidity,
709 amount0, amount1, tick_lower, tick_upper)
710 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
711 ",
712 )
713 .bind(&chain_ids[..])
714 .bind(&pool_addresses[..])
715 .bind(&blocks[..])
716 .bind(&transaction_hashes[..])
717 .bind(&transaction_indices[..])
718 .bind(&log_indices[..])
719 .bind(&event_types[..])
720 .bind(&senders[..])
721 .bind(&owners[..])
722 .bind(&position_liquidities[..])
723 .bind(&amount0s[..])
724 .bind(&amount1s[..])
725 .bind(&tick_lowers[..])
726 .bind(&tick_uppers[..])
727 .execute(&self.pool)
728 .await
729 .map(|_| ())
730 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
731 }
732
733 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
739 sqlx::query(
740 r"
741 INSERT INTO token (
742 chain_id, address, name, symbol, decimals
743 ) VALUES ($1, $2, $3, $4, $5)
744 ON CONFLICT (chain_id, address)
745 DO UPDATE
746 SET
747 name = $3,
748 symbol = $4,
749 decimals = $5
750 ",
751 )
752 .bind(token.chain.chain_id as i32)
753 .bind(token.address.to_string())
754 .bind(token.name.as_str())
755 .bind(token.symbol.as_str())
756 .bind(i32::from(token.decimals))
757 .execute(&self.pool)
758 .await
759 .map(|_| ())
760 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
761 }
762
763 pub async fn add_invalid_token(
769 &self,
770 chain_id: u32,
771 address: &Address,
772 error_string: &str,
773 ) -> anyhow::Result<()> {
774 sqlx::query(
775 r"
776 INSERT INTO token (
777 chain_id, address, error
778 ) VALUES ($1, $2, $3)
779 ON CONFLICT (chain_id, address)
780 DO NOTHING;
781 ",
782 )
783 .bind(chain_id as i32)
784 .bind(address.to_string())
785 .bind(error_string)
786 .execute(&self.pool)
787 .await
788 .map(|_| ())
789 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
790 }
791
792 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
798 sqlx::query(
799 r"
800 INSERT INTO pool_swap_event (
801 chain_id, pool_address, block, transaction_hash, transaction_index,
802 log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
803 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
804 ON CONFLICT (chain_id, transaction_hash, log_index)
805 DO NOTHING
806 ",
807 )
808 .bind(chain_id as i32)
809 .bind(swap.pool_address.to_string())
810 .bind(swap.block as i64)
811 .bind(swap.transaction_hash.as_str())
812 .bind(swap.transaction_index as i32)
813 .bind(swap.log_index as i32)
814 .bind(swap.sender.to_string())
815 .bind(swap.recipient.to_string())
816 .bind(swap.side.map(|side| side.to_string()))
817 .bind(swap.size.map(|size| size.to_string()))
818 .bind(swap.price.map(|price| price.to_string()))
819 .bind(swap.sqrt_price_x96.to_string())
820 .bind(swap.amount0.to_string())
821 .bind(swap.amount1.to_string())
822 .execute(&self.pool)
823 .await
824 .map(|_| ())
825 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
826 }
827
828 pub async fn add_pool_liquidity_update(
834 &self,
835 chain_id: u32,
836 liquidity_update: &PoolLiquidityUpdate,
837 ) -> anyhow::Result<()> {
838 sqlx::query(
839 r"
840 INSERT INTO pool_liquidity_event (
841 chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
842 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
843 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
844 ON CONFLICT (chain_id, transaction_hash, log_index)
845 DO NOTHING
846 ",
847 )
848 .bind(chain_id as i32)
849 .bind(liquidity_update.pool_address.to_string())
850 .bind(liquidity_update.block as i64)
851 .bind(liquidity_update.transaction_hash.as_str())
852 .bind(liquidity_update.transaction_index as i32)
853 .bind(liquidity_update.log_index as i32)
854 .bind(liquidity_update.kind.to_string())
855 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
856 .bind(liquidity_update.owner.to_string())
857 .bind(U128Pg(liquidity_update.position_liquidity))
858 .bind(U256Pg(liquidity_update.amount0))
859 .bind(U256Pg(liquidity_update.amount1))
860 .bind(liquidity_update.tick_lower)
861 .bind(liquidity_update.tick_upper)
862 .execute(&self.pool)
863 .await
864 .map(|_| ())
865 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
866 }
867
868 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
877 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
878 .bind(chain.chain_id as i32)
879 .fetch_all(&self.pool)
880 .await
881 .map(|rows| {
882 rows.into_iter()
883 .map(|token_row| {
884 Token::new(
885 chain.clone(),
886 token_row.address,
887 token_row.name,
888 token_row.symbol,
889 token_row.decimals as u8,
890 )
891 })
892 .collect::<Vec<_>>()
893 })
894 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
895 }
896
897 pub async fn load_invalid_token_addresses(
903 &self,
904 chain_id: u32,
905 ) -> anyhow::Result<Vec<Address>> {
906 sqlx::query_as::<_, (String,)>(
907 "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
908 )
909 .bind(chain_id as i32)
910 .fetch_all(&self.pool)
911 .await?
912 .into_iter()
913 .map(|(address,)| validate_address(&address))
914 .collect::<Result<Vec<_>, _>>()
915 .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
916 }
917
918 pub async fn load_pools(
924 &self,
925 chain: SharedChain,
926 dex_id: &str,
927 ) -> anyhow::Result<Vec<PoolRow>> {
928 sqlx::query_as::<_, PoolRow>(
929 r"
930 SELECT
931 address,
932 dex_name,
933 creation_block,
934 token0_chain,
935 token0_address,
936 token1_chain,
937 token1_address,
938 fee,
939 tick_spacing,
940 initial_tick,
941 initial_sqrt_price_x96
942 FROM pool
943 WHERE chain_id = $1 AND dex_name = $2
944 ORDER BY creation_block ASC
945 ",
946 )
947 .bind(chain.chain_id as i32)
948 .bind(dex_id)
949 .fetch_all(&self.pool)
950 .await
951 .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
952 }
953
954 pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
968 if enable {
969 tracing::info!("Enabling performance sync settings for bulk operations");
970
971 sqlx::query("SET synchronous_commit = OFF")
973 .execute(&self.pool)
974 .await
975 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
976
977 sqlx::query("SET work_mem = '256MB'")
979 .execute(&self.pool)
980 .await
981 .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
982
983 tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
984 } else {
985 tracing::info!("Restoring default safe database performance settings");
986
987 sqlx::query("SET synchronous_commit = ON")
989 .execute(&self.pool)
990 .await
991 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
992
993 sqlx::query("RESET work_mem")
995 .execute(&self.pool)
996 .await
997 .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
998 }
999
1000 Ok(())
1001 }
1002
1003 pub async fn update_dex_last_synced_block(
1009 &self,
1010 chain_id: u32,
1011 dex: &DexType,
1012 block_number: u64,
1013 ) -> anyhow::Result<()> {
1014 sqlx::query(
1015 r"
1016 UPDATE dex
1017 SET last_full_sync_pools_block_number = $3
1018 WHERE chain_id = $1 AND name = $2
1019 ",
1020 )
1021 .bind(chain_id as i32)
1022 .bind(dex.to_string())
1023 .bind(block_number as i64)
1024 .execute(&self.pool)
1025 .await
1026 .map(|_| ())
1027 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1028 }
1029
1030 pub async fn update_pool_last_synced_block(
1031 &self,
1032 chain_id: u32,
1033 dex: &DexType,
1034 pool_address: &Address,
1035 block_number: u64,
1036 ) -> anyhow::Result<()> {
1037 sqlx::query(
1038 r"
1039 UPDATE pool
1040 SET last_full_sync_block_number = $4
1041 WHERE chain_id = $1
1042 AND dex_name = $2
1043 AND address = $3
1044 ",
1045 )
1046 .bind(chain_id as i32)
1047 .bind(dex.to_string())
1048 .bind(pool_address.to_string())
1049 .bind(block_number as i64)
1050 .execute(&self.pool)
1051 .await
1052 .map(|_| ())
1053 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1054 }
1055
1056 pub async fn get_dex_last_synced_block(
1062 &self,
1063 chain_id: u32,
1064 dex: &DexType,
1065 ) -> anyhow::Result<Option<u64>> {
1066 let result = sqlx::query_as::<_, (Option<i64>,)>(
1067 r#"
1068 SELECT
1069 last_full_sync_pools_block_number
1070 FROM dex
1071 WHERE chain_id = $1
1072 AND name = $2
1073 "#,
1074 )
1075 .bind(chain_id as i32)
1076 .bind(dex.to_string())
1077 .fetch_optional(&self.pool)
1078 .await
1079 .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1080
1081 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1082 }
1083
1084 pub async fn get_pool_last_synced_block(
1085 &self,
1086 chain_id: u32,
1087 dex: &DexType,
1088 pool_address: &Address,
1089 ) -> anyhow::Result<Option<u64>> {
1090 let result = sqlx::query_as::<_, (Option<i64>,)>(
1091 r#"
1092 SELECT
1093 last_full_sync_block_number
1094 FROM pool
1095 WHERE chain_id = $1
1096 AND dex_name = $2
1097 AND address = $3
1098 "#,
1099 )
1100 .bind(chain_id as i32)
1101 .bind(dex.to_string())
1102 .bind(pool_address.to_string())
1103 .fetch_optional(&self.pool)
1104 .await
1105 .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1106
1107 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1108 }
1109
1110 pub async fn get_table_last_block(
1117 &self,
1118 chain_id: u32,
1119 table_name: &str,
1120 pool_address: &Address,
1121 ) -> anyhow::Result<Option<u64>> {
1122 let query = format!(
1123 "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1124 table_name
1125 );
1126 let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1127 .bind(chain_id as i32)
1128 .bind(pool_address.to_string())
1129 .fetch_optional(&self.pool)
1130 .await
1131 .map_err(|e| {
1132 anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1133 })?;
1134
1135 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1136 }
1137
1138 pub async fn add_pool_collects_batch(
1144 &self,
1145 chain_id: u32,
1146 collects: &[PoolFeeCollect],
1147 ) -> anyhow::Result<()> {
1148 if collects.is_empty() {
1149 return Ok(());
1150 }
1151
1152 let len = collects.len();
1154 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1155 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1156 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1157 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1158 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1159 let mut owners: Vec<String> = Vec::with_capacity(len);
1160 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1161 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1162 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1163 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1164 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1165
1166 for collect in collects {
1168 chain_ids.push(chain_id as i32);
1169 pool_addresses.push(collect.pool_address.to_string());
1170 blocks.push(collect.block as i64);
1171 transaction_hashes.push(collect.transaction_hash.clone());
1172 transaction_indices.push(collect.transaction_index as i32);
1173 log_indices.push(collect.log_index as i32);
1174 owners.push(collect.owner.to_string());
1175 amount0s.push(collect.amount0.to_string());
1176 amount1s.push(collect.amount1.to_string());
1177 tick_lowers.push(collect.tick_lower);
1178 tick_uppers.push(collect.tick_upper);
1179 }
1180
1181 sqlx::query(
1183 r"
1184 INSERT INTO pool_collect_event (
1185 chain_id, pool_address, block, transaction_hash, transaction_index,
1186 log_index, owner, amount0, amount1, tick_lower, tick_upper
1187 )
1188 SELECT
1189 chain_id, pool_address, block, transaction_hash, transaction_index,
1190 log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1191 FROM UNNEST(
1192 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1193 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1194 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1195 log_index, owner, amount0, amount1, tick_lower, tick_upper)
1196 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1197 ",
1198 )
1199 .bind(&chain_ids[..])
1200 .bind(&pool_addresses[..])
1201 .bind(&blocks[..])
1202 .bind(&transaction_hashes[..])
1203 .bind(&transaction_indices[..])
1204 .bind(&log_indices[..])
1205 .bind(&owners[..])
1206 .bind(&amount0s[..])
1207 .bind(&amount1s[..])
1208 .bind(&tick_lowers[..])
1209 .bind(&tick_uppers[..])
1210 .execute(&self.pool)
1211 .await
1212 .map(|_| ())
1213 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1214 }
1215
1216 pub async fn add_pool_flash_batch(
1222 &self,
1223 chain_id: u32,
1224 flash_events: &[PoolFlash],
1225 ) -> anyhow::Result<()> {
1226 if flash_events.is_empty() {
1227 return Ok(());
1228 }
1229
1230 let len = flash_events.len();
1232 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1233 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1234 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1235 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1236 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1237 let mut senders: Vec<String> = Vec::with_capacity(len);
1238 let mut recipients: Vec<String> = Vec::with_capacity(len);
1239 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1240 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1241 let mut paid0s: Vec<String> = Vec::with_capacity(len);
1242 let mut paid1s: Vec<String> = Vec::with_capacity(len);
1243 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1244
1245 for flash in flash_events {
1247 chain_ids.push(chain_id as i32);
1248 pool_addresses.push(flash.pool_address.to_string());
1249 blocks.push(flash.block as i64);
1250 transaction_hashes.push(flash.transaction_hash.clone());
1251 transaction_indices.push(flash.transaction_index as i32);
1252 log_indices.push(flash.log_index as i32);
1253 senders.push(flash.sender.to_string());
1254 recipients.push(flash.recipient.to_string());
1255 amount0s.push(flash.amount0.to_string());
1256 amount1s.push(flash.amount1.to_string());
1257 paid0s.push(flash.paid0.to_string());
1258 paid1s.push(flash.paid1.to_string());
1259 }
1260
1261 sqlx::query(
1263 r"
1264 INSERT INTO pool_flash_event (
1265 chain_id, pool_address, block, transaction_hash, transaction_index,
1266 log_index, sender, recipient, amount0, amount1, paid0, paid1
1267 )
1268 SELECT
1269 chain_id, pool_address, block, transaction_hash, transaction_index,
1270 log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1271 FROM UNNEST(
1272 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1273 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1274 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1275 log_index, sender, recipient, amount0, amount1, paid0, paid1)
1276 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1277 ",
1278 )
1279 .bind(&chain_ids[..])
1280 .bind(&pool_addresses[..])
1281 .bind(&blocks[..])
1282 .bind(&transaction_hashes[..])
1283 .bind(&transaction_indices[..])
1284 .bind(&log_indices[..])
1285 .bind(&senders[..])
1286 .bind(&recipients[..])
1287 .bind(&amount0s[..])
1288 .bind(&amount1s[..])
1289 .bind(&paid0s[..])
1290 .bind(&paid1s[..])
1291 .execute(&self.pool)
1292 .await
1293 .map(|_| ())
1294 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1295 }
1296
1297 pub async fn add_pool_snapshot(
1298 &self,
1299 chain_id: u32,
1300 pool_address: &Address,
1301 snapshot: &PoolSnapshot,
1302 ) -> anyhow::Result<()> {
1303 sqlx::query(
1304 r"
1305 INSERT INTO pool_snapshot (
1306 chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1307 current_tick, price_sqrt_ratio_x96, liquidity,
1308 protocol_fees_token0, protocol_fees_token1, fee_protocol,
1309 fee_growth_global_0, fee_growth_global_1,
1310 total_amount0_deposited, total_amount1_deposited,
1311 total_amount0_collected, total_amount1_collected,
1312 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes
1313 ) VALUES (
1314 $1, $2, $3, $4, $5, $6,
1315 $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1316 $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1317 $19, $20, $21, $22, $23
1318 )
1319 ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1320 DO NOTHING
1321 ",
1322 )
1323 .bind(chain_id as i32)
1324 .bind(pool_address.to_string())
1325 .bind(snapshot.block_position.number as i64)
1326 .bind(snapshot.block_position.transaction_index as i32)
1327 .bind(snapshot.block_position.log_index as i32)
1328 .bind(snapshot.block_position.transaction_hash.to_string())
1329 .bind(snapshot.state.current_tick)
1330 .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1331 .bind(snapshot.state.liquidity.to_string())
1332 .bind(snapshot.state.protocol_fees_token0.to_string())
1333 .bind(snapshot.state.protocol_fees_token1.to_string())
1334 .bind(snapshot.state.fee_protocol as i16)
1335 .bind(snapshot.state.fee_growth_global_0.to_string())
1336 .bind(snapshot.state.fee_growth_global_1.to_string())
1337 .bind(snapshot.analytics.total_amount0_deposited.to_string())
1338 .bind(snapshot.analytics.total_amount1_deposited.to_string())
1339 .bind(snapshot.analytics.total_amount0_collected.to_string())
1340 .bind(snapshot.analytics.total_amount1_collected.to_string())
1341 .bind(snapshot.analytics.total_swaps as i32)
1342 .bind(snapshot.analytics.total_mints as i32)
1343 .bind(snapshot.analytics.total_burns as i32)
1344 .bind(snapshot.analytics.total_fee_collects as i32)
1345 .bind(snapshot.analytics.total_flashes as i32)
1346 .execute(&self.pool)
1347 .await
1348 .map(|_| ())
1349 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1350 }
1351
1352 pub async fn add_pool_positions_batch(
1358 &self,
1359 chain_id: u32,
1360 snapshot_block: u64,
1361 snapshot_transaction_index: u32,
1362 snapshot_log_index: u32,
1363 positions: &[(Address, PoolPosition)],
1364 ) -> anyhow::Result<()> {
1365 if positions.is_empty() {
1366 return Ok(());
1367 }
1368
1369 let len = positions.len();
1371 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1372 let mut owners: Vec<String> = Vec::with_capacity(len);
1373 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1374 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1375 let mut liquidities: Vec<String> = Vec::with_capacity(len);
1376 let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1377 let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1378 let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1379 let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1380 let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1381 let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1382 let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1383 let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1384
1385 for (pool_address, position) in positions {
1387 pool_addresses.push(pool_address.to_string());
1388 owners.push(position.owner.to_string());
1389 tick_lowers.push(position.tick_lower);
1390 tick_uppers.push(position.tick_upper);
1391 liquidities.push(position.liquidity.to_string());
1392 fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1393 fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1394 tokens_owed_0s.push(position.tokens_owed_0.to_string());
1395 tokens_owed_1s.push(position.tokens_owed_1.to_string());
1396 total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1397 total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1398 total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1399 total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1400 }
1401
1402 sqlx::query(
1404 r"
1405 INSERT INTO pool_position (
1406 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1407 owner, tick_lower, tick_upper,
1408 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1409 tokens_owed_0, tokens_owed_1,
1410 total_amount0_deposited, total_amount1_deposited,
1411 total_amount0_collected, total_amount1_collected
1412 )
1413 SELECT
1414 $1, pool_address, $2, $3, $4,
1415 owner, tick_lower, tick_upper,
1416 liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1417 tokens_owed_0::U128, tokens_owed_1::U128,
1418 total_amount0_deposited::U256, total_amount1_deposited::U256,
1419 total_amount0_collected::U128, total_amount1_collected::U128
1420 FROM UNNEST(
1421 $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1422 $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1423 $16::TEXT[], $17::TEXT[]
1424 ) AS t(pool_address, owner, tick_lower, tick_upper,
1425 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1426 tokens_owed_0, tokens_owed_1,
1427 total_amount0_deposited, total_amount1_deposited,
1428 total_amount0_collected, total_amount1_collected)
1429 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1430 DO NOTHING
1431 ",
1432 )
1433 .bind(chain_id as i32)
1434 .bind(snapshot_block as i64)
1435 .bind(snapshot_transaction_index as i32)
1436 .bind(snapshot_log_index as i32)
1437 .bind(&pool_addresses[..])
1438 .bind(&owners[..])
1439 .bind(&tick_lowers[..])
1440 .bind(&tick_uppers[..])
1441 .bind(&liquidities[..])
1442 .bind(&fee_growth_inside_0_lasts[..])
1443 .bind(&fee_growth_inside_1_lasts[..])
1444 .bind(&tokens_owed_0s[..])
1445 .bind(&tokens_owed_1s[..])
1446 .bind(&total_amount0_depositeds as &[Option<String>])
1447 .bind(&total_amount1_depositeds as &[Option<String>])
1448 .bind(&total_amount0_collecteds as &[Option<String>])
1449 .bind(&total_amount1_collecteds as &[Option<String>])
1450 .execute(&self.pool)
1451 .await
1452 .map(|_| ())
1453 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1454 }
1455
1456 pub async fn add_pool_ticks_batch(
1462 &self,
1463 chain_id: u32,
1464 snapshot_block: u64,
1465 snapshot_transaction_index: u32,
1466 snapshot_log_index: u32,
1467 ticks: &[(Address, &PoolTick)],
1468 ) -> anyhow::Result<()> {
1469 if ticks.is_empty() {
1470 return Ok(());
1471 }
1472
1473 let len = ticks.len();
1475 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1476 let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1477 let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1478 let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1479 let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1480 let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1481 let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1482 let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1483
1484 for (pool_address, tick) in ticks {
1486 pool_addresses.push(pool_address.to_string());
1487 tick_values.push(tick.value);
1488 liquidity_grosses.push(tick.liquidity_gross.to_string());
1489 liquidity_nets.push(tick.liquidity_net.to_string());
1490 fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1491 fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1492 initializeds.push(tick.initialized);
1493 last_updated_blocks.push(tick.last_updated_block as i64);
1494 }
1495
1496 sqlx::query(
1498 r"
1499 INSERT INTO pool_tick (
1500 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1501 tick_value, liquidity_gross, liquidity_net,
1502 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1503 )
1504 SELECT
1505 $1, pool_address, $2, $3, $4,
1506 tick_value, liquidity_gross::U128, liquidity_net::I128,
1507 fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1508 FROM UNNEST(
1509 $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1510 $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1511 ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1512 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1513 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1514 DO NOTHING
1515 ",
1516 )
1517 .bind(chain_id as i32)
1518 .bind(snapshot_block as i64)
1519 .bind(snapshot_transaction_index as i32)
1520 .bind(snapshot_log_index as i32)
1521 .bind(&pool_addresses[..])
1522 .bind(&tick_values[..])
1523 .bind(&liquidity_grosses[..])
1524 .bind(&liquidity_nets[..])
1525 .bind(&fee_growth_outside_0s[..])
1526 .bind(&fee_growth_outside_1s[..])
1527 .bind(&initializeds[..])
1528 .bind(&last_updated_blocks[..])
1529 .execute(&self.pool)
1530 .await
1531 .map(|_| ())
1532 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1533 }
1534
1535 pub async fn update_pool_initial_price_tick(
1536 &self,
1537 chain_id: u32,
1538 initialize_event: &InitializeEvent,
1539 ) -> anyhow::Result<()> {
1540 sqlx::query(
1541 r"
1542 UPDATE pool
1543 SET
1544 initial_tick = $4,
1545 initial_sqrt_price_x96 = $5
1546 WHERE chain_id = $1
1547 AND dex_name = $2
1548 AND address = $3
1549 ",
1550 )
1551 .bind(chain_id as i32)
1552 .bind(initialize_event.dex.name.to_string())
1553 .bind(initialize_event.pool_address.to_string())
1554 .bind(initialize_event.tick)
1555 .bind(initialize_event.sqrt_price_x96.to_string())
1556 .execute(&self.pool)
1557 .await
1558 .map(|_| ())
1559 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1560 }
1561
1562 pub async fn load_latest_valid_pool_snapshot(
1570 &self,
1571 chain_id: u32,
1572 pool_address: &Address,
1573 ) -> anyhow::Result<Option<PoolSnapshot>> {
1574 let result = sqlx::query(
1575 r"
1576 SELECT
1577 block, transaction_index, log_index, transaction_hash,
1578 current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1579 protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1580 fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1581 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1582 total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1583 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1584 (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1585 FROM pool_snapshot
1586 WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1587 ORDER BY block DESC, transaction_index DESC, log_index DESC
1588 LIMIT 1
1589 ",
1590 )
1591 .bind(chain_id as i32)
1592 .bind(pool_address.to_string())
1593 .fetch_optional(&self.pool)
1594 .await
1595 .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1596
1597 if let Some(row) = result {
1598 let block: i64 = row.get("block");
1600 let transaction_index: i32 = row.get("transaction_index");
1601 let log_index: i32 = row.get("log_index");
1602 let transaction_hash: String = row.get("transaction_hash");
1603
1604 let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1605 block as u64,
1606 transaction_hash,
1607 transaction_index as u32,
1608 log_index as u32,
1609 );
1610
1611 let state = PoolState {
1612 current_tick: row.get("current_tick"),
1613 price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1614 liquidity: row.get::<String, _>("liquidity").parse()?,
1615 protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1616 protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1617 fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1618 fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1619 fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1620 };
1621
1622 let analytics = PoolAnalytics {
1623 total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1624 total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1625 total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1626 total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1627 total_swaps: row.get::<i32, _>("total_swaps") as u64,
1628 total_mints: row.get::<i32, _>("total_mints") as u64,
1629 total_burns: row.get::<i32, _>("total_burns") as u64,
1630 total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1631 total_flashes: row.get::<i32, _>("total_flashes") as u64,
1632 #[cfg(debug_assertions)]
1633 swap_processing_time: std::time::Duration::ZERO,
1634 #[cfg(debug_assertions)]
1635 mint_processing_time: std::time::Duration::ZERO,
1636 #[cfg(debug_assertions)]
1637 burn_processing_time: std::time::Duration::ZERO,
1638 #[cfg(debug_assertions)]
1639 collect_processing_time: std::time::Duration::ZERO,
1640 };
1641
1642 let positions = self
1644 .load_pool_positions_for_snapshot(
1645 chain_id,
1646 pool_address,
1647 block as u64,
1648 transaction_index as u32,
1649 log_index as u32,
1650 )
1651 .await?;
1652
1653 let ticks = self
1654 .load_pool_ticks_for_snapshot(
1655 chain_id,
1656 pool_address,
1657 block as u64,
1658 transaction_index as u32,
1659 log_index as u32,
1660 )
1661 .await?;
1662
1663 let dex_name: String = row.get("dex_name");
1664 let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1665 .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1666
1667 let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1668 .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1669
1670 let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1671 .ok_or_else(|| {
1672 anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1673 })?;
1674
1675 let instrument_id =
1676 Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1677
1678 Ok(Some(PoolSnapshot::new(
1679 instrument_id,
1680 state,
1681 positions,
1682 ticks,
1683 analytics,
1684 block_position,
1685 )))
1686 } else {
1687 Ok(None)
1688 }
1689 }
1690
1691 pub async fn mark_pool_snapshot_valid(
1697 &self,
1698 chain_id: u32,
1699 pool_address: &Address,
1700 block: u64,
1701 transaction_index: u32,
1702 log_index: u32,
1703 ) -> anyhow::Result<()> {
1704 sqlx::query(
1705 r"
1706 UPDATE pool_snapshot
1707 SET is_valid = TRUE
1708 WHERE chain_id = $1
1709 AND pool_address = $2
1710 AND block = $3
1711 AND transaction_index = $4
1712 AND log_index = $5
1713 ",
1714 )
1715 .bind(chain_id as i32)
1716 .bind(pool_address.to_string())
1717 .bind(block as i64)
1718 .bind(transaction_index as i32)
1719 .bind(log_index as i32)
1720 .execute(&self.pool)
1721 .await
1722 .map(|_| ())
1723 .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1724 }
1725
1726 pub async fn load_pool_positions_for_snapshot(
1732 &self,
1733 chain_id: u32,
1734 pool_address: &Address,
1735 snapshot_block: u64,
1736 snapshot_transaction_index: u32,
1737 snapshot_log_index: u32,
1738 ) -> anyhow::Result<Vec<PoolPosition>> {
1739 let rows = sqlx::query(
1740 r"
1741 SELECT
1742 owner, tick_lower, tick_upper,
1743 liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1744 tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1745 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1746 total_amount0_collected::TEXT, total_amount1_collected::TEXT
1747 FROM pool_position
1748 WHERE chain_id = $1
1749 AND pool_address = $2
1750 AND snapshot_block = $3
1751 AND snapshot_transaction_index = $4
1752 AND snapshot_log_index = $5
1753 ",
1754 )
1755 .bind(chain_id as i32)
1756 .bind(pool_address.to_string())
1757 .bind(snapshot_block as i64)
1758 .bind(snapshot_transaction_index as i32)
1759 .bind(snapshot_log_index as i32)
1760 .fetch_all(&self.pool)
1761 .await
1762 .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1763
1764 rows.iter()
1765 .map(|row| {
1766 let owner: String = row.get("owner");
1767 let position = PoolPosition {
1768 owner: validate_address(&owner)?,
1769 tick_lower: row.get("tick_lower"),
1770 tick_upper: row.get("tick_upper"),
1771 liquidity: row.get::<String, _>("liquidity").parse()?,
1772 fee_growth_inside_0_last: row
1773 .get::<String, _>("fee_growth_inside_0_last")
1774 .parse()?,
1775 fee_growth_inside_1_last: row
1776 .get::<String, _>("fee_growth_inside_1_last")
1777 .parse()?,
1778 tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1779 tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1780 total_amount0_deposited: row
1781 .get::<String, _>("total_amount0_deposited")
1782 .parse()?,
1783 total_amount1_deposited: row
1784 .get::<String, _>("total_amount1_deposited")
1785 .parse()?,
1786 total_amount0_collected: row
1787 .get::<String, _>("total_amount0_collected")
1788 .parse()?,
1789 total_amount1_collected: row
1790 .get::<String, _>("total_amount1_collected")
1791 .parse()?,
1792 };
1793 Ok(position)
1794 })
1795 .collect()
1796 }
1797
1798 pub async fn load_pool_ticks_for_snapshot(
1804 &self,
1805 chain_id: u32,
1806 pool_address: &Address,
1807 snapshot_block: u64,
1808 snapshot_transaction_index: u32,
1809 snapshot_log_index: u32,
1810 ) -> anyhow::Result<Vec<PoolTick>> {
1811 let rows = sqlx::query(
1812 r"
1813 SELECT
1814 tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1815 fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1816 last_updated_block
1817 FROM pool_tick
1818 WHERE chain_id = $1
1819 AND pool_address = $2
1820 AND snapshot_block = $3
1821 AND snapshot_transaction_index = $4
1822 AND snapshot_log_index = $5
1823 ",
1824 )
1825 .bind(chain_id as i32)
1826 .bind(pool_address.to_string())
1827 .bind(snapshot_block as i64)
1828 .bind(snapshot_transaction_index as i32)
1829 .bind(snapshot_log_index as i32)
1830 .fetch_all(&self.pool)
1831 .await
1832 .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1833
1834 rows.iter()
1835 .map(|row| {
1836 let tick = PoolTick::new(
1837 row.get("tick_value"),
1838 row.get::<String, _>("liquidity_gross").parse()?,
1839 row.get::<String, _>("liquidity_net").parse()?,
1840 row.get::<String, _>("fee_growth_outside_0").parse()?,
1841 row.get::<String, _>("fee_growth_outside_1").parse()?,
1842 row.get("initialized"),
1843 row.get::<i64, _>("last_updated_block") as u64,
1844 );
1845 Ok(tick)
1846 })
1847 .collect()
1848 }
1849
1850 pub fn stream_pool_events<'a>(
1863 &'a self,
1864 chain: SharedChain,
1865 dex: SharedDex,
1866 instrument_id: InstrumentId,
1867 pool_address: &Address,
1868 from_position: Option<BlockPosition>,
1869 ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1870 const QUERY_ALL: &str = r"
1872 (SELECT
1873 'swap' as event_type,
1874 chain_id,
1875 pool_address,
1876 block,
1877 transaction_hash,
1878 transaction_index,
1879 log_index,
1880 sender,
1881 recipient,
1882 NULL::TEXT as owner,
1883 side,
1884 size,
1885 price,
1886 sqrt_price_x96::TEXT,
1887 liquidity::TEXT as swap_liquidity,
1888 tick as swap_tick,
1889 amount0::TEXT as swap_amount0,
1890 amount1::TEXT as swap_amount1,
1891 NULL::TEXT as position_liquidity,
1892 NULL::TEXT as amount0,
1893 NULL::TEXT as amount1,
1894 NULL::INT as tick_lower,
1895 NULL::INT as tick_upper,
1896 NULL::TEXT as liquidity_event_type,
1897 NULL::TEXT as flash_amount0,
1898 NULL::TEXT as flash_amount1,
1899 NULL::TEXT as flash_paid0,
1900 NULL::TEXT as flash_paid1
1901 FROM pool_swap_event
1902 WHERE chain_id = $1 AND pool_address = $2)
1903 UNION ALL
1904 (SELECT
1905 'liquidity' as event_type,
1906 chain_id,
1907 pool_address,
1908 block,
1909 transaction_hash,
1910 transaction_index,
1911 log_index,
1912 sender,
1913 NULL::TEXT as recipient,
1914 owner,
1915 NULL::TEXT as side,
1916 NULL::TEXT as size,
1917 NULL::text as price,
1918 NULL::text as sqrt_price_x96,
1919 NULL::TEXT as swap_liquidity,
1920 NULL::INT as swap_tick,
1921 amount0::TEXT as swap_amount0,
1922 amount1::TEXT as swap_amount1,
1923 position_liquidity::TEXT,
1924 amount0::TEXT,
1925 amount1::TEXT,
1926 tick_lower::INT,
1927 tick_upper::INT,
1928 event_type as liquidity_event_type,
1929 NULL::TEXT as flash_amount0,
1930 NULL::TEXT as flash_amount1,
1931 NULL::TEXT as flash_paid0,
1932 NULL::TEXT as flash_paid1
1933 FROM pool_liquidity_event
1934 WHERE chain_id = $1 AND pool_address = $2)
1935 UNION ALL
1936 (SELECT
1937 'collect' as event_type,
1938 chain_id,
1939 pool_address,
1940 block,
1941 transaction_hash,
1942 transaction_index,
1943 log_index,
1944 NULL::TEXT as sender,
1945 NULL::TEXT as recipient,
1946 owner,
1947 NULL::TEXT as side,
1948 NULL::TEXT as size,
1949 NULL::TEXT as price,
1950 NULL::TEXT as sqrt_price_x96,
1951 NULL::TEXT as swap_liquidity,
1952 NULL::INT AS swap_tick,
1953 amount0::TEXT as swap_amount0,
1954 amount1::TEXT as swap_amount1,
1955 NULL::TEXT as position_liquidity,
1956 amount0::TEXT,
1957 amount1::TEXT,
1958 tick_lower::INT,
1959 tick_upper::INT,
1960 NULL::TEXT as liquidity_event_type,
1961 NULL::TEXT as flash_amount0,
1962 NULL::TEXT as flash_amount1,
1963 NULL::TEXT as flash_paid0,
1964 NULL::TEXT as flash_paid1
1965 FROM pool_collect_event
1966 WHERE chain_id = $1 AND pool_address = $2)
1967 UNION ALL
1968 (SELECT
1969 'flash' as event_type,
1970 chain_id,
1971 pool_address,
1972 block,
1973 transaction_hash,
1974 transaction_index,
1975 log_index,
1976 sender,
1977 recipient,
1978 NULL::TEXT as owner,
1979 NULL::TEXT as side,
1980 NULL::TEXT as size,
1981 NULL::TEXT as price,
1982 NULL::TEXT as sqrt_price_x96,
1983 NULL::TEXT as swap_liquidity,
1984 NULL::INT AS swap_tick,
1985 NULL::TEXT as swap_amount0,
1986 NULL::TEXT as swap_amount1,
1987 NULL::TEXT as position_liquidity,
1988 NULL::TEXT as amount0,
1989 NULL::TEXT as amount1,
1990 NULL::INT as tick_lower,
1991 NULL::INT as tick_upper,
1992 NULL::TEXT as liquidity_event_type,
1993 amount0::TEXT as flash_amount0,
1994 amount1::TEXT as flash_amount1,
1995 paid0::TEXT as flash_paid0,
1996 paid1::TEXT as flash_paid1
1997 FROM pool_flash_event
1998 WHERE chain_id = $1 AND pool_address = $2)
1999 ORDER BY block, transaction_index, log_index";
2000
2001 const QUERY_FROM_POSITION: &str = r"
2003 (SELECT
2004 'swap' as event_type,
2005 chain_id,
2006 pool_address,
2007 block,
2008 transaction_hash,
2009 transaction_index,
2010 log_index,
2011 sender,
2012 recipient,
2013 NULL::TEXT as owner,
2014 side,
2015 size,
2016 price,
2017 sqrt_price_x96::TEXT,
2018 liquidity::TEXT as swap_liquidity,
2019 tick as swap_tick,
2020 amount0::TEXT as swap_amount0,
2021 amount1::TEXT as swap_amount1,
2022 NULL::TEXT as position_liquidity,
2023 NULL::TEXT as amount0,
2024 NULL::TEXT as amount1,
2025 NULL::INT as tick_lower,
2026 NULL::INT as tick_upper,
2027 NULL::TEXT as liquidity_event_type,
2028 NULL::TEXT as flash_amount0,
2029 NULL::TEXT as flash_amount1,
2030 NULL::TEXT as flash_paid0,
2031 NULL::TEXT as flash_paid1
2032 FROM pool_swap_event
2033 WHERE chain_id = $1 AND pool_address = $2
2034 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2035 UNION ALL
2036 (SELECT
2037 'liquidity' as event_type,
2038 chain_id,
2039 pool_address,
2040 block,
2041 transaction_hash,
2042 transaction_index,
2043 log_index,
2044 sender,
2045 NULL::TEXT as recipient,
2046 owner,
2047 NULL::TEXT as side,
2048 NULL::TEXT as size,
2049 NULL::text as price,
2050 NULL::text as sqrt_price_x96,
2051 NULL::TEXT as swap_liquidity,
2052 NULL::INT as swap_tick,
2053 amount0::TEXT as swap_amount0,
2054 amount1::TEXT as swap_amount1,
2055 position_liquidity::TEXT,
2056 amount0::TEXT,
2057 amount1::TEXT,
2058 tick_lower::INT,
2059 tick_upper::INT,
2060 event_type as liquidity_event_type,
2061 NULL::TEXT as flash_amount0,
2062 NULL::TEXT as flash_amount1,
2063 NULL::TEXT as flash_paid0,
2064 NULL::TEXT as flash_paid1
2065 FROM pool_liquidity_event
2066 WHERE chain_id = $1 AND pool_address = $2
2067 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2068 UNION ALL
2069 (SELECT
2070 'collect' as event_type,
2071 chain_id,
2072 pool_address,
2073 block,
2074 transaction_hash,
2075 transaction_index,
2076 log_index,
2077 NULL::TEXT as sender,
2078 NULL::TEXT as recipient,
2079 owner,
2080 NULL::TEXT as side,
2081 NULL::TEXT as size,
2082 NULL::TEXT as price,
2083 NULL::TEXT as sqrt_price_x96,
2084 NULL::TEXT as swap_liquidity,
2085 NULL::INT AS swap_tick,
2086 amount0::TEXT as swap_amount0,
2087 amount1::TEXT as swap_amount1,
2088 NULL::TEXT as position_liquidity,
2089 amount0::TEXT,
2090 amount1::TEXT,
2091 tick_lower::INT,
2092 tick_upper::INT,
2093 NULL::TEXT as liquidity_event_type,
2094 NULL::TEXT as flash_amount0,
2095 NULL::TEXT as flash_amount1,
2096 NULL::TEXT as flash_paid0,
2097 NULL::TEXT as flash_paid1
2098 FROM pool_collect_event
2099 WHERE chain_id = $1 AND pool_address = $2
2100 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2101 UNION ALL
2102 (SELECT
2103 'flash' as event_type,
2104 chain_id,
2105 pool_address,
2106 block,
2107 transaction_hash,
2108 transaction_index,
2109 log_index,
2110 sender,
2111 recipient,
2112 NULL::TEXT as owner,
2113 NULL::TEXT as side,
2114 NULL::TEXT as size,
2115 NULL::TEXT as price,
2116 NULL::TEXT as sqrt_price_x96,
2117 NULL::TEXT as swap_liquidity,
2118 NULL::INT AS swap_tick,
2119 NULL::TEXT as swap_amount0,
2120 NULL::TEXT as swap_amount1,
2121 NULL::TEXT as position_liquidity,
2122 NULL::TEXT as amount0,
2123 NULL::TEXT as amount1,
2124 NULL::INT as tick_lower,
2125 NULL::INT as tick_upper,
2126 NULL::TEXT as liquidity_event_type,
2127 amount0::TEXT as flash_amount0,
2128 amount1::TEXT as flash_amount1,
2129 paid0::TEXT as flash_paid0,
2130 paid1::TEXT as flash_paid1
2131 FROM pool_flash_event
2132 WHERE chain_id = $1 AND pool_address = $2
2133 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2134 ORDER BY block, transaction_index, log_index";
2135
2136 let query = if let Some(pos) = from_position {
2138 sqlx::query(QUERY_FROM_POSITION)
2139 .bind(chain.chain_id as i32)
2140 .bind(pool_address.to_string())
2141 .bind(pos.number as i64)
2142 .bind(pos.transaction_index as i32)
2143 .bind(pos.log_index as i32)
2144 .fetch(&self.pool)
2145 } else {
2146 sqlx::query(QUERY_ALL)
2147 .bind(chain.chain_id as i32)
2148 .bind(pool_address.to_string())
2149 .fetch(&self.pool)
2150 };
2151
2152 let stream = query.map(move |row_result| match row_result {
2154 Ok(row) => {
2155 transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2156 .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2157 }
2158 Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2159 });
2160
2161 Box::pin(stream)
2162 }
2163}