1use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21 defi::{
22 Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23 data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24 pool_analysis::{
25 position::PoolPosition,
26 snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27 },
28 tick_map::tick::PoolTick,
29 validation::validate_address,
30 },
31 identifiers::InstrumentId,
32};
33use rust_decimal::Decimal;
34use sqlx::{PgPool, Row, postgres::PgConnectOptions};
35
36use crate::{
37 cache::{
38 consistency::CachedBlocksConsistencyStatus,
39 copy::PostgresCopyHandler,
40 rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
41 types::{U128Pg, U256Pg},
42 },
43 events::initialize::InitializeEvent,
44};
45
46#[derive(Debug)]
48pub struct BlockchainCacheDatabase {
49 pool: PgPool,
51}
52
53impl BlockchainCacheDatabase {
54 pub async fn init(pg_options: PgConnectOptions) -> Self {
60 let pool = sqlx::postgres::PgPoolOptions::new()
61 .max_connections(32) .min_connections(5) .acquire_timeout(std::time::Duration::from_secs(3))
64 .connect_with(pg_options)
65 .await
66 .expect("Error connecting to Postgres");
67 Self { pool }
68 }
69
70 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
76 sqlx::query(
77 r"
78 INSERT INTO chain (
79 chain_id, name
80 ) VALUES ($1,$2)
81 ON CONFLICT (chain_id)
82 DO NOTHING
83 ",
84 )
85 .bind(chain.chain_id as i32)
86 .bind(chain.name.to_string())
87 .execute(&self.pool)
88 .await
89 .map(|_| ())
90 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
91 }
92
93 pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
100 let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
101 .bind(chain.chain_id as i32)
102 .fetch_one(&self.pool)
103 .await
104 .map_err(|e| {
105 anyhow::anyhow!(
106 "Failed to call create_block_partition for chain {}: {e}",
107 chain.chain_id
108 )
109 })?;
110
111 Ok(result.0)
112 }
113
114 pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
121 let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
122 .bind(chain.chain_id as i32)
123 .fetch_one(&self.pool)
124 .await
125 .map_err(|e| {
126 anyhow::anyhow!(
127 "Failed to call create_token_partition for chain {}: {e}",
128 chain.chain_id
129 )
130 })?;
131
132 Ok(result.0)
133 }
134
135 pub async fn get_block_consistency_status(
141 &self,
142 chain: &Chain,
143 ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
144 tracing::info!("Fetching block consistency status");
145
146 let result: (i64, i64) = sqlx::query_as(
147 r"
148 SELECT
149 COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
150 get_last_continuous_block($1) as last_continuous_block
151 "
152 )
153 .bind(chain.chain_id as i32)
154 .fetch_one(&self.pool)
155 .await
156 .map_err(|e| {
157 anyhow::anyhow!(
158 "Failed to get block info for chain {}: {}",
159 chain.chain_id,
160 e
161 )
162 })?;
163
164 Ok(CachedBlocksConsistencyStatus::new(
165 result.0 as u64,
166 result.1 as u64,
167 ))
168 }
169
170 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
176 sqlx::query(
177 r"
178 INSERT INTO block (
179 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
180 base_fee_per_gas, blob_gas_used, excess_blob_gas,
181 l1_gas_price, l1_gas_used, l1_fee_scalar
182 ) VALUES (
183 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
184 )
185 ON CONFLICT (chain_id, number)
186 DO UPDATE
187 SET
188 hash = $3,
189 parent_hash = $4,
190 miner = $5,
191 gas_limit = $6,
192 gas_used = $7,
193 timestamp = $8,
194 base_fee_per_gas = $9,
195 blob_gas_used = $10,
196 excess_blob_gas = $11,
197 l1_gas_price = $12,
198 l1_gas_used = $13,
199 l1_fee_scalar = $14
200 ",
201 )
202 .bind(chain_id as i32)
203 .bind(block.number as i64)
204 .bind(block.hash.as_str())
205 .bind(block.parent_hash.as_str())
206 .bind(block.miner.as_str())
207 .bind(block.gas_limit as i64)
208 .bind(block.gas_used as i64)
209 .bind(block.timestamp.to_string())
210 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
211 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
212 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
213 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
214 .bind(block.l1_gas_used.map(|v| v as i64))
215 .bind(block.l1_fee_scalar.map(|v| v as i64))
216 .execute(&self.pool)
217 .await
218 .map(|_| ())
219 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
220 }
221
222 pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
228 if blocks.is_empty() {
229 return Ok(());
230 }
231
232 let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
234 let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
235 let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
236 let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
237 let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
238 let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
239 let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
240 let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241 let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242 let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243 let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
244 let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245 let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
246
247 for block in blocks {
249 numbers.push(block.number as i64);
250 hashes.push(block.hash.clone());
251 parent_hashes.push(block.parent_hash.clone());
252 miners.push(block.miner.to_string());
253 gas_limits.push(block.gas_limit as i64);
254 gas_useds.push(block.gas_used as i64);
255 timestamps.push(block.timestamp.to_string());
256 base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
257 blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
258 excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
259 l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
260 l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
261 l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
262 }
263
264 sqlx::query(
266 r"
267 INSERT INTO block (
268 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
269 base_fee_per_gas, blob_gas_used, excess_blob_gas,
270 l1_gas_price, l1_gas_used, l1_fee_scalar
271 )
272 SELECT
273 $1, *
274 FROM UNNEST(
275 $2::int8[], $3::text[], $4::text[], $5::text[],
276 $6::int8[], $7::int8[], $8::text[],
277 $9::text[], $10::text[], $11::text[],
278 $12::text[], $13::int8[], $14::int8[]
279 )
280 ON CONFLICT (chain_id, number) DO NOTHING
281 ",
282 )
283 .bind(chain_id as i32)
284 .bind(&numbers[..])
285 .bind(&hashes[..])
286 .bind(&parent_hashes[..])
287 .bind(&miners[..])
288 .bind(&gas_limits[..])
289 .bind(&gas_useds[..])
290 .bind(×tamps[..])
291 .bind(&base_fee_per_gases as &[Option<String>])
292 .bind(&blob_gas_useds as &[Option<String>])
293 .bind(&excess_blob_gases as &[Option<String>])
294 .bind(&l1_gas_prices as &[Option<String>])
295 .bind(&l1_gas_useds as &[Option<i64>])
296 .bind(&l1_fee_scalars as &[Option<i64>])
297 .execute(&self.pool)
298 .await
299 .map(|_| ())
300 .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
301 }
302
303 pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
312 let copy_handler = PostgresCopyHandler::new(&self.pool);
313 copy_handler.copy_blocks(chain_id, blocks).await
314 }
315
316 pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
322 let copy_handler = PostgresCopyHandler::new(&self.pool);
323 copy_handler.copy_tokens(chain_id, tokens).await
324 }
325
326 pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
332 let copy_handler = PostgresCopyHandler::new(&self.pool);
333 copy_handler.copy_pools(chain_id, pools).await
334 }
335
336 pub async fn add_pool_swaps_copy(
345 &self,
346 chain_id: u32,
347 swaps: &[PoolSwap],
348 ) -> anyhow::Result<()> {
349 let copy_handler = PostgresCopyHandler::new(&self.pool);
350 copy_handler.copy_pool_swaps(chain_id, swaps).await
351 }
352
353 pub async fn add_pool_liquidity_updates_copy(
362 &self,
363 chain_id: u32,
364 updates: &[PoolLiquidityUpdate],
365 ) -> anyhow::Result<()> {
366 let copy_handler = PostgresCopyHandler::new(&self.pool);
367 copy_handler
368 .copy_pool_liquidity_updates(chain_id, updates)
369 .await
370 }
371
372 pub async fn copy_pool_fee_collects_batch(
381 &self,
382 chain_id: u32,
383 collects: &[PoolFeeCollect],
384 ) -> anyhow::Result<()> {
385 let copy_handler = PostgresCopyHandler::new(&self.pool);
386 copy_handler.copy_pool_collects(chain_id, collects).await
387 }
388
389 pub async fn load_block_timestamps(
395 &self,
396 chain: SharedChain,
397 from_block: u64,
398 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
399 sqlx::query_as::<_, BlockTimestampRow>(
400 r"
401 SELECT
402 number,
403 timestamp
404 FROM block
405 WHERE chain_id = $1 AND number >= $2
406 ORDER BY number ASC
407 ",
408 )
409 .bind(chain.chain_id as i32)
410 .bind(from_block as i64)
411 .fetch_all(&self.pool)
412 .await
413 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
414 }
415
416 pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
422 sqlx::query(
423 r"
424 INSERT INTO dex (
425 chain_id, name, factory_address, creation_block
426 ) VALUES ($1, $2, $3, $4)
427 ON CONFLICT (chain_id, name)
428 DO UPDATE
429 SET
430 factory_address = $3,
431 creation_block = $4
432 ",
433 )
434 .bind(dex.chain.chain_id as i32)
435 .bind(dex.name.to_string())
436 .bind(dex.factory.to_string())
437 .bind(dex.factory_creation_block as i64)
438 .execute(&self.pool)
439 .await
440 .map(|_| ())
441 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
442 }
443
444 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
450 sqlx::query(
451 r"
452 INSERT INTO pool (
453 chain_id, address, dex_name, creation_block,
454 token0_chain, token0_address,
455 token1_chain, token1_address,
456 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
457 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
458 ON CONFLICT (chain_id, address)
459 DO UPDATE
460 SET
461 dex_name = $3,
462 creation_block = $4,
463 token0_chain = $5,
464 token0_address = $6,
465 token1_chain = $7,
466 token1_address = $8,
467 fee = $9,
468 tick_spacing = $10,
469 initial_tick = $11,
470 initial_sqrt_price_x96 = $12
471 ",
472 )
473 .bind(pool.chain.chain_id as i32)
474 .bind(pool.address.to_string())
475 .bind(pool.dex.name.to_string())
476 .bind(pool.creation_block as i64)
477 .bind(pool.token0.chain.chain_id as i32)
478 .bind(pool.token0.address.to_string())
479 .bind(pool.token1.chain.chain_id as i32)
480 .bind(pool.token1.address.to_string())
481 .bind(pool.fee.map(|fee| fee as i32))
482 .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
483 .bind(pool.initial_tick)
484 .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
485 .execute(&self.pool)
486 .await
487 .map(|_| ())
488 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
489 }
490
491 pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
497 if pools.is_empty() {
498 return Ok(());
499 }
500
501 let len = pools.len();
503 let mut addresses: Vec<String> = Vec::with_capacity(len);
504 let mut dex_names: Vec<String> = Vec::with_capacity(len);
505 let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
506 let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
507 let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
508 let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
509 let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
510 let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
511 let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
512 let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
513 let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
514 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
515
516 for pool in pools {
518 chain_ids.push(pool.chain.chain_id as i32);
519 addresses.push(pool.address.to_string());
520 dex_names.push(pool.dex.name.to_string());
521 creation_blocks.push(pool.creation_block as i64);
522 token0_chains.push(pool.token0.chain.chain_id as i32);
523 token0_addresses.push(pool.token0.address.to_string());
524 token1_chains.push(pool.token1.chain.chain_id as i32);
525 token1_addresses.push(pool.token1.address.to_string());
526 fees.push(pool.fee.map(|fee| fee as i32));
527 tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
528 initial_ticks.push(pool.initial_tick);
529 initial_sqrt_price_x96s
530 .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
531 }
532
533 sqlx::query(
535 r"
536 INSERT INTO pool (
537 chain_id, address, dex_name, creation_block,
538 token0_chain, token0_address,
539 token1_chain, token1_address,
540 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
541 )
542 SELECT *
543 FROM UNNEST(
544 $1::int4[], $2::text[], $3::text[], $4::int8[],
545 $5::int4[], $6::text[], $7::int4[], $8::text[],
546 $9::int4[], $10::int4[], $11::int4[], $12::text[]
547 )
548 ON CONFLICT (chain_id, address) DO NOTHING
549 ",
550 )
551 .bind(&chain_ids[..])
552 .bind(&addresses[..])
553 .bind(&dex_names[..])
554 .bind(&creation_blocks[..])
555 .bind(&token0_chains[..])
556 .bind(&token0_addresses[..])
557 .bind(&token1_chains[..])
558 .bind(&token1_addresses[..])
559 .bind(&fees[..])
560 .bind(&tick_spacings[..])
561 .bind(&initial_ticks[..])
562 .bind(&initial_sqrt_price_x96s[..])
563 .execute(&self.pool)
564 .await
565 .map(|_| ())
566 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
567 }
568
569 pub async fn add_pool_swaps_batch(
575 &self,
576 chain_id: u32,
577 swaps: &[PoolSwap],
578 ) -> anyhow::Result<()> {
579 if swaps.is_empty() {
580 return Ok(());
581 }
582
583 let len = swaps.len();
585 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
586 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
587 let mut blocks: Vec<i64> = Vec::with_capacity(len);
588 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
589 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
590 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
591 let mut senders: Vec<String> = Vec::with_capacity(len);
592 let mut recipients: Vec<String> = Vec::with_capacity(len);
593 let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
594 let mut liquidities: Vec<String> = Vec::with_capacity(len);
595 let mut ticks: Vec<i32> = Vec::with_capacity(len);
596 let mut amount0s: Vec<String> = Vec::with_capacity(len);
597 let mut amount1s: Vec<String> = Vec::with_capacity(len);
598 let mut order_sides: Vec<Option<String>> = Vec::with_capacity(len);
599 let mut base_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
600 let mut quote_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
601 let mut spot_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
602 let mut execution_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
603
604 for swap in swaps {
606 chain_ids.push(chain_id as i32);
607 pool_addresses.push(swap.pool_address.to_string());
608 blocks.push(swap.block as i64);
609 transaction_hashes.push(swap.transaction_hash.clone());
610 transaction_indices.push(swap.transaction_index as i32);
611 log_indices.push(swap.log_index as i32);
612 senders.push(swap.sender.to_string());
613 recipients.push(swap.recipient.to_string());
614 sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
615 liquidities.push(swap.liquidity.to_string());
616 ticks.push(swap.tick);
617 amount0s.push(swap.amount0.to_string());
618 amount1s.push(swap.amount1.to_string());
619
620 if let Some(ref trade_info) = swap.trade_info {
622 order_sides.push(Some(trade_info.order_side.to_string()));
623 base_quantities.push(Some(trade_info.quantity_base.as_decimal()));
624 quote_quantities.push(Some(trade_info.quantity_quote.as_decimal()));
625 spot_prices.push(Some(trade_info.spot_price.as_decimal()));
626 execution_prices.push(Some(trade_info.execution_price.as_decimal()));
627 } else {
628 order_sides.push(None);
629 base_quantities.push(None);
630 quote_quantities.push(None);
631 spot_prices.push(None);
632 execution_prices.push(None);
633 }
634 }
635
636 sqlx::query(
638 r"
639 INSERT INTO pool_swap_event (
640 chain_id, pool_address, block, transaction_hash, transaction_index,
641 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
642 order_side, base_quantity, quote_quantity, spot_price, execution_price
643 )
644 SELECT
645 chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
646 sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256,
647 order_side, base_quantity, quote_quantity, spot_price, execution_price
648 FROM UNNEST(
649 $1::INT[], $2::TEXT[], $3::BIGINT[], $4::TEXT[], $5::INT[], $6::INT[],
650 $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::INT[], $12::TEXT[], $13::TEXT[],
651 $14::TEXT[], $15, $16, $17, $18
652 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
653 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
654 order_side, base_quantity, quote_quantity, spot_price, execution_price)
655 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
656 ",
657 )
658 .bind(&chain_ids[..])
659 .bind(&pool_addresses[..])
660 .bind(&blocks[..])
661 .bind(&transaction_hashes[..])
662 .bind(&transaction_indices[..])
663 .bind(&log_indices[..])
664 .bind(&senders[..])
665 .bind(&recipients[..])
666 .bind(&sqrt_price_x96s[..])
667 .bind(&liquidities[..])
668 .bind(&ticks[..])
669 .bind(&amount0s[..])
670 .bind(&amount1s[..])
671 .bind(&order_sides[..])
672 .bind(&base_quantities[..])
673 .bind("e_quantities[..])
674 .bind(&spot_prices[..])
675 .bind(&execution_prices[..])
676 .execute(&self.pool)
677 .await
678 .map(|_| ())
679 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
680 }
681
682 pub async fn add_pool_liquidity_updates_batch(
688 &self,
689 chain_id: u32,
690 updates: &[PoolLiquidityUpdate],
691 ) -> anyhow::Result<()> {
692 if updates.is_empty() {
693 return Ok(());
694 }
695
696 let len = updates.len();
698 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
699 let mut blocks: Vec<i64> = Vec::with_capacity(len);
700 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
701 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
702 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
703 let mut event_types: Vec<String> = Vec::with_capacity(len);
704 let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
705 let mut owners: Vec<String> = Vec::with_capacity(len);
706 let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
707 let mut amount0s: Vec<String> = Vec::with_capacity(len);
708 let mut amount1s: Vec<String> = Vec::with_capacity(len);
709 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
710 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
711 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
712
713 for update in updates {
715 chain_ids.push(chain_id as i32);
716 pool_addresses.push(update.pool_address.to_string());
717 blocks.push(update.block as i64);
718 transaction_hashes.push(update.transaction_hash.clone());
719 transaction_indices.push(update.transaction_index as i32);
720 log_indices.push(update.log_index as i32);
721 event_types.push(update.kind.to_string());
722 senders.push(update.sender.map(|s| s.to_string()));
723 owners.push(update.owner.to_string());
724 position_liquidities.push(update.position_liquidity.to_string());
725 amount0s.push(update.amount0.to_string());
726 amount1s.push(update.amount1.to_string());
727 tick_lowers.push(update.tick_lower);
728 tick_uppers.push(update.tick_upper);
729 }
730
731 sqlx::query(
733 r"
734 INSERT INTO pool_liquidity_event (
735 chain_id, pool_address, block, transaction_hash, transaction_index,
736 log_index, event_type, sender, owner, position_liquidity,
737 amount0, amount1, tick_lower, tick_upper
738 )
739 SELECT
740 chain_id, pool_address, block, transaction_hash, transaction_index,
741 log_index, event_type, sender, owner, position_liquidity::u128,
742 amount0::U256, amount1::U256, tick_lower, tick_upper
743 FROM UNNEST(
744 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
745 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
746 $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
747 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
748 log_index, event_type, sender, owner, position_liquidity,
749 amount0, amount1, tick_lower, tick_upper)
750 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
751 ",
752 )
753 .bind(&chain_ids[..])
754 .bind(&pool_addresses[..])
755 .bind(&blocks[..])
756 .bind(&transaction_hashes[..])
757 .bind(&transaction_indices[..])
758 .bind(&log_indices[..])
759 .bind(&event_types[..])
760 .bind(&senders[..])
761 .bind(&owners[..])
762 .bind(&position_liquidities[..])
763 .bind(&amount0s[..])
764 .bind(&amount1s[..])
765 .bind(&tick_lowers[..])
766 .bind(&tick_uppers[..])
767 .execute(&self.pool)
768 .await
769 .map(|_| ())
770 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
771 }
772
773 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
779 sqlx::query(
780 r"
781 INSERT INTO token (
782 chain_id, address, name, symbol, decimals
783 ) VALUES ($1, $2, $3, $4, $5)
784 ON CONFLICT (chain_id, address)
785 DO UPDATE
786 SET
787 name = $3,
788 symbol = $4,
789 decimals = $5
790 ",
791 )
792 .bind(token.chain.chain_id as i32)
793 .bind(token.address.to_string())
794 .bind(token.name.as_str())
795 .bind(token.symbol.as_str())
796 .bind(i32::from(token.decimals))
797 .execute(&self.pool)
798 .await
799 .map(|_| ())
800 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
801 }
802
803 pub async fn add_invalid_token(
809 &self,
810 chain_id: u32,
811 address: &Address,
812 error_string: &str,
813 ) -> anyhow::Result<()> {
814 sqlx::query(
815 r"
816 INSERT INTO token (
817 chain_id, address, error
818 ) VALUES ($1, $2, $3)
819 ON CONFLICT (chain_id, address)
820 DO NOTHING;
821 ",
822 )
823 .bind(chain_id as i32)
824 .bind(address.to_string())
825 .bind(error_string)
826 .execute(&self.pool)
827 .await
828 .map(|_| ())
829 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
830 }
831
832 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
838 let (order_side, base_quantity, quote_quantity, spot_price, execution_price) =
840 if let Some(ref trade_info) = swap.trade_info {
841 (
842 Some(trade_info.order_side.to_string()),
843 Some(trade_info.quantity_base.as_decimal()),
844 Some(trade_info.quantity_quote.as_decimal()),
845 Some(trade_info.spot_price.as_decimal()),
846 Some(trade_info.execution_price.as_decimal()),
847 )
848 } else {
849 (None, None, None, None, None)
850 };
851
852 sqlx::query(
853 r"
854 INSERT INTO pool_swap_event (
855 chain_id, pool_address, block, transaction_hash, transaction_index,
856 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
857 order_side, base_quantity, quote_quantity, spot_price, execution_price
858 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::U160, $10::U128, $11, $12::I256, $13::I256, $14, $15, $16, $17, $18)
859 ON CONFLICT (chain_id, transaction_hash, log_index)
860 DO NOTHING
861 ",
862 )
863 .bind(chain_id as i32)
864 .bind(swap.pool_address.to_string())
865 .bind(swap.block as i64)
866 .bind(swap.transaction_hash.as_str())
867 .bind(swap.transaction_index as i32)
868 .bind(swap.log_index as i32)
869 .bind(swap.sender.to_string())
870 .bind(swap.recipient.to_string())
871 .bind(swap.sqrt_price_x96.to_string())
872 .bind(swap.liquidity.to_string())
873 .bind(swap.tick)
874 .bind(swap.amount0.to_string())
875 .bind(swap.amount1.to_string())
876 .bind(order_side)
877 .bind(base_quantity)
878 .bind(quote_quantity)
879 .bind(spot_price)
880 .bind(execution_price)
881 .execute(&self.pool)
882 .await
883 .map(|_| ())
884 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
885 }
886
887 pub async fn add_pool_liquidity_update(
893 &self,
894 chain_id: u32,
895 liquidity_update: &PoolLiquidityUpdate,
896 ) -> anyhow::Result<()> {
897 sqlx::query(
898 r"
899 INSERT INTO pool_liquidity_event (
900 chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
901 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
902 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
903 ON CONFLICT (chain_id, transaction_hash, log_index)
904 DO NOTHING
905 ",
906 )
907 .bind(chain_id as i32)
908 .bind(liquidity_update.pool_address.to_string())
909 .bind(liquidity_update.block as i64)
910 .bind(liquidity_update.transaction_hash.as_str())
911 .bind(liquidity_update.transaction_index as i32)
912 .bind(liquidity_update.log_index as i32)
913 .bind(liquidity_update.kind.to_string())
914 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
915 .bind(liquidity_update.owner.to_string())
916 .bind(U128Pg(liquidity_update.position_liquidity))
917 .bind(U256Pg(liquidity_update.amount0))
918 .bind(U256Pg(liquidity_update.amount1))
919 .bind(liquidity_update.tick_lower)
920 .bind(liquidity_update.tick_upper)
921 .execute(&self.pool)
922 .await
923 .map(|_| ())
924 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
925 }
926
927 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
936 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
937 .bind(chain.chain_id as i32)
938 .fetch_all(&self.pool)
939 .await
940 .map(|rows| {
941 rows.into_iter()
942 .map(|token_row| {
943 Token::new(
944 chain.clone(),
945 token_row.address,
946 token_row.name,
947 token_row.symbol,
948 token_row.decimals as u8,
949 )
950 })
951 .collect::<Vec<_>>()
952 })
953 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
954 }
955
956 pub async fn load_invalid_token_addresses(
962 &self,
963 chain_id: u32,
964 ) -> anyhow::Result<Vec<Address>> {
965 sqlx::query_as::<_, (String,)>(
966 "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
967 )
968 .bind(chain_id as i32)
969 .fetch_all(&self.pool)
970 .await?
971 .into_iter()
972 .map(|(address,)| validate_address(&address))
973 .collect::<Result<Vec<_>, _>>()
974 .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
975 }
976
977 pub async fn load_pools(
983 &self,
984 chain: SharedChain,
985 dex_id: &str,
986 ) -> anyhow::Result<Vec<PoolRow>> {
987 sqlx::query_as::<_, PoolRow>(
988 r"
989 SELECT
990 address,
991 dex_name,
992 creation_block,
993 token0_chain,
994 token0_address,
995 token1_chain,
996 token1_address,
997 fee,
998 tick_spacing,
999 initial_tick,
1000 initial_sqrt_price_x96
1001 FROM pool
1002 WHERE chain_id = $1 AND dex_name = $2
1003 ORDER BY creation_block ASC
1004 ",
1005 )
1006 .bind(chain.chain_id as i32)
1007 .bind(dex_id)
1008 .fetch_all(&self.pool)
1009 .await
1010 .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
1011 }
1012
1013 pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
1027 if enable {
1028 tracing::info!("Enabling performance sync settings for bulk operations");
1029
1030 sqlx::query("SET synchronous_commit = OFF")
1032 .execute(&self.pool)
1033 .await
1034 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
1035
1036 sqlx::query("SET work_mem = '256MB'")
1038 .execute(&self.pool)
1039 .await
1040 .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1041
1042 tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1043 } else {
1044 tracing::info!("Restoring default safe database performance settings");
1045
1046 sqlx::query("SET synchronous_commit = ON")
1048 .execute(&self.pool)
1049 .await
1050 .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1051
1052 sqlx::query("RESET work_mem")
1054 .execute(&self.pool)
1055 .await
1056 .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1057 }
1058
1059 Ok(())
1060 }
1061
1062 pub async fn update_dex_last_synced_block(
1068 &self,
1069 chain_id: u32,
1070 dex: &DexType,
1071 block_number: u64,
1072 ) -> anyhow::Result<()> {
1073 sqlx::query(
1074 r"
1075 UPDATE dex
1076 SET last_full_sync_pools_block_number = $3
1077 WHERE chain_id = $1 AND name = $2
1078 ",
1079 )
1080 .bind(chain_id as i32)
1081 .bind(dex.to_string())
1082 .bind(block_number as i64)
1083 .execute(&self.pool)
1084 .await
1085 .map(|_| ())
1086 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1087 }
1088
1089 pub async fn update_pool_last_synced_block(
1095 &self,
1096 chain_id: u32,
1097 dex: &DexType,
1098 pool_address: &Address,
1099 block_number: u64,
1100 ) -> anyhow::Result<()> {
1101 sqlx::query(
1102 r"
1103 UPDATE pool
1104 SET last_full_sync_block_number = $4
1105 WHERE chain_id = $1
1106 AND dex_name = $2
1107 AND address = $3
1108 ",
1109 )
1110 .bind(chain_id as i32)
1111 .bind(dex.to_string())
1112 .bind(pool_address.to_string())
1113 .bind(block_number as i64)
1114 .execute(&self.pool)
1115 .await
1116 .map(|_| ())
1117 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1118 }
1119
1120 pub async fn get_dex_last_synced_block(
1126 &self,
1127 chain_id: u32,
1128 dex: &DexType,
1129 ) -> anyhow::Result<Option<u64>> {
1130 let result = sqlx::query_as::<_, (Option<i64>,)>(
1131 r#"
1132 SELECT
1133 last_full_sync_pools_block_number
1134 FROM dex
1135 WHERE chain_id = $1
1136 AND name = $2
1137 "#,
1138 )
1139 .bind(chain_id as i32)
1140 .bind(dex.to_string())
1141 .fetch_optional(&self.pool)
1142 .await
1143 .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1144
1145 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1146 }
1147
1148 pub async fn get_pool_last_synced_block(
1154 &self,
1155 chain_id: u32,
1156 dex: &DexType,
1157 pool_address: &Address,
1158 ) -> anyhow::Result<Option<u64>> {
1159 let result = sqlx::query_as::<_, (Option<i64>,)>(
1160 r#"
1161 SELECT
1162 last_full_sync_block_number
1163 FROM pool
1164 WHERE chain_id = $1
1165 AND dex_name = $2
1166 AND address = $3
1167 "#,
1168 )
1169 .bind(chain_id as i32)
1170 .bind(dex.to_string())
1171 .bind(pool_address.to_string())
1172 .fetch_optional(&self.pool)
1173 .await
1174 .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1175
1176 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1177 }
1178
1179 pub async fn get_table_last_block(
1186 &self,
1187 chain_id: u32,
1188 table_name: &str,
1189 pool_address: &Address,
1190 ) -> anyhow::Result<Option<u64>> {
1191 let query = format!(
1192 "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1193 table_name
1194 );
1195 let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1196 .bind(chain_id as i32)
1197 .bind(pool_address.to_string())
1198 .fetch_optional(&self.pool)
1199 .await
1200 .map_err(|e| {
1201 anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1202 })?;
1203
1204 Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1205 }
1206
1207 pub async fn add_pool_collects_batch(
1213 &self,
1214 chain_id: u32,
1215 collects: &[PoolFeeCollect],
1216 ) -> anyhow::Result<()> {
1217 if collects.is_empty() {
1218 return Ok(());
1219 }
1220
1221 let len = collects.len();
1223 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1224 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1225 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1226 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1227 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1228 let mut owners: Vec<String> = Vec::with_capacity(len);
1229 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1230 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1231 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1232 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1233 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1234
1235 for collect in collects {
1237 chain_ids.push(chain_id as i32);
1238 pool_addresses.push(collect.pool_address.to_string());
1239 blocks.push(collect.block as i64);
1240 transaction_hashes.push(collect.transaction_hash.clone());
1241 transaction_indices.push(collect.transaction_index as i32);
1242 log_indices.push(collect.log_index as i32);
1243 owners.push(collect.owner.to_string());
1244 amount0s.push(collect.amount0.to_string());
1245 amount1s.push(collect.amount1.to_string());
1246 tick_lowers.push(collect.tick_lower);
1247 tick_uppers.push(collect.tick_upper);
1248 }
1249
1250 sqlx::query(
1252 r"
1253 INSERT INTO pool_collect_event (
1254 chain_id, pool_address, block, transaction_hash, transaction_index,
1255 log_index, owner, amount0, amount1, tick_lower, tick_upper
1256 )
1257 SELECT
1258 chain_id, pool_address, block, transaction_hash, transaction_index,
1259 log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1260 FROM UNNEST(
1261 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1262 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1263 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1264 log_index, owner, amount0, amount1, tick_lower, tick_upper)
1265 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1266 ",
1267 )
1268 .bind(&chain_ids[..])
1269 .bind(&pool_addresses[..])
1270 .bind(&blocks[..])
1271 .bind(&transaction_hashes[..])
1272 .bind(&transaction_indices[..])
1273 .bind(&log_indices[..])
1274 .bind(&owners[..])
1275 .bind(&amount0s[..])
1276 .bind(&amount1s[..])
1277 .bind(&tick_lowers[..])
1278 .bind(&tick_uppers[..])
1279 .execute(&self.pool)
1280 .await
1281 .map(|_| ())
1282 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1283 }
1284
1285 pub async fn add_pool_flash_batch(
1291 &self,
1292 chain_id: u32,
1293 flash_events: &[PoolFlash],
1294 ) -> anyhow::Result<()> {
1295 if flash_events.is_empty() {
1296 return Ok(());
1297 }
1298
1299 let len = flash_events.len();
1301 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1302 let mut blocks: Vec<i64> = Vec::with_capacity(len);
1303 let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1304 let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1305 let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1306 let mut senders: Vec<String> = Vec::with_capacity(len);
1307 let mut recipients: Vec<String> = Vec::with_capacity(len);
1308 let mut amount0s: Vec<String> = Vec::with_capacity(len);
1309 let mut amount1s: Vec<String> = Vec::with_capacity(len);
1310 let mut paid0s: Vec<String> = Vec::with_capacity(len);
1311 let mut paid1s: Vec<String> = Vec::with_capacity(len);
1312 let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1313
1314 for flash in flash_events {
1316 chain_ids.push(chain_id as i32);
1317 pool_addresses.push(flash.pool_address.to_string());
1318 blocks.push(flash.block as i64);
1319 transaction_hashes.push(flash.transaction_hash.clone());
1320 transaction_indices.push(flash.transaction_index as i32);
1321 log_indices.push(flash.log_index as i32);
1322 senders.push(flash.sender.to_string());
1323 recipients.push(flash.recipient.to_string());
1324 amount0s.push(flash.amount0.to_string());
1325 amount1s.push(flash.amount1.to_string());
1326 paid0s.push(flash.paid0.to_string());
1327 paid1s.push(flash.paid1.to_string());
1328 }
1329
1330 sqlx::query(
1332 r"
1333 INSERT INTO pool_flash_event (
1334 chain_id, pool_address, block, transaction_hash, transaction_index,
1335 log_index, sender, recipient, amount0, amount1, paid0, paid1
1336 )
1337 SELECT
1338 chain_id, pool_address, block, transaction_hash, transaction_index,
1339 log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1340 FROM UNNEST(
1341 $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1342 $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1343 ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1344 log_index, sender, recipient, amount0, amount1, paid0, paid1)
1345 ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1346 ",
1347 )
1348 .bind(&chain_ids[..])
1349 .bind(&pool_addresses[..])
1350 .bind(&blocks[..])
1351 .bind(&transaction_hashes[..])
1352 .bind(&transaction_indices[..])
1353 .bind(&log_indices[..])
1354 .bind(&senders[..])
1355 .bind(&recipients[..])
1356 .bind(&amount0s[..])
1357 .bind(&amount1s[..])
1358 .bind(&paid0s[..])
1359 .bind(&paid1s[..])
1360 .execute(&self.pool)
1361 .await
1362 .map(|_| ())
1363 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1364 }
1365
1366 pub async fn add_pool_snapshot(
1372 &self,
1373 chain_id: u32,
1374 pool_address: &Address,
1375 snapshot: &PoolSnapshot,
1376 ) -> anyhow::Result<()> {
1377 sqlx::query(
1378 r"
1379 INSERT INTO pool_snapshot (
1380 chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1381 current_tick, price_sqrt_ratio_x96, liquidity,
1382 protocol_fees_token0, protocol_fees_token1, fee_protocol,
1383 fee_growth_global_0, fee_growth_global_1,
1384 total_amount0_deposited, total_amount1_deposited,
1385 total_amount0_collected, total_amount1_collected,
1386 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1387 liquidity_utilization_rate
1388 ) VALUES (
1389 $1, $2, $3, $4, $5, $6,
1390 $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1391 $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1392 $19, $20, $21, $22, $23, $24
1393 )
1394 ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1395 DO NOTHING
1396 ",
1397 )
1398 .bind(chain_id as i32)
1399 .bind(pool_address.to_string())
1400 .bind(snapshot.block_position.number as i64)
1401 .bind(snapshot.block_position.transaction_index as i32)
1402 .bind(snapshot.block_position.log_index as i32)
1403 .bind(snapshot.block_position.transaction_hash.clone())
1404 .bind(snapshot.state.current_tick)
1405 .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1406 .bind(snapshot.state.liquidity.to_string())
1407 .bind(snapshot.state.protocol_fees_token0.to_string())
1408 .bind(snapshot.state.protocol_fees_token1.to_string())
1409 .bind(snapshot.state.fee_protocol as i16)
1410 .bind(snapshot.state.fee_growth_global_0.to_string())
1411 .bind(snapshot.state.fee_growth_global_1.to_string())
1412 .bind(snapshot.analytics.total_amount0_deposited.to_string())
1413 .bind(snapshot.analytics.total_amount1_deposited.to_string())
1414 .bind(snapshot.analytics.total_amount0_collected.to_string())
1415 .bind(snapshot.analytics.total_amount1_collected.to_string())
1416 .bind(snapshot.analytics.total_swaps as i32)
1417 .bind(snapshot.analytics.total_mints as i32)
1418 .bind(snapshot.analytics.total_burns as i32)
1419 .bind(snapshot.analytics.total_fee_collects as i32)
1420 .bind(snapshot.analytics.total_flashes as i32)
1421 .bind(snapshot.analytics.liquidity_utilization_rate)
1422 .execute(&self.pool)
1423 .await
1424 .map(|_| ())
1425 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1426 }
1427
1428 pub async fn add_pool_positions_batch(
1434 &self,
1435 chain_id: u32,
1436 snapshot_block: u64,
1437 snapshot_transaction_index: u32,
1438 snapshot_log_index: u32,
1439 positions: &[(Address, PoolPosition)],
1440 ) -> anyhow::Result<()> {
1441 if positions.is_empty() {
1442 return Ok(());
1443 }
1444
1445 let len = positions.len();
1447 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1448 let mut owners: Vec<String> = Vec::with_capacity(len);
1449 let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1450 let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1451 let mut liquidities: Vec<String> = Vec::with_capacity(len);
1452 let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1453 let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1454 let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1455 let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1456 let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1457 let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1458 let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1459 let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1460
1461 for (pool_address, position) in positions {
1463 pool_addresses.push(pool_address.to_string());
1464 owners.push(position.owner.to_string());
1465 tick_lowers.push(position.tick_lower);
1466 tick_uppers.push(position.tick_upper);
1467 liquidities.push(position.liquidity.to_string());
1468 fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1469 fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1470 tokens_owed_0s.push(position.tokens_owed_0.to_string());
1471 tokens_owed_1s.push(position.tokens_owed_1.to_string());
1472 total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1473 total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1474 total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1475 total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1476 }
1477
1478 sqlx::query(
1480 r"
1481 INSERT INTO pool_position (
1482 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1483 owner, tick_lower, tick_upper,
1484 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1485 tokens_owed_0, tokens_owed_1,
1486 total_amount0_deposited, total_amount1_deposited,
1487 total_amount0_collected, total_amount1_collected
1488 )
1489 SELECT
1490 $1, pool_address, $2, $3, $4,
1491 owner, tick_lower, tick_upper,
1492 liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1493 tokens_owed_0::U128, tokens_owed_1::U128,
1494 total_amount0_deposited::U256, total_amount1_deposited::U256,
1495 total_amount0_collected::U128, total_amount1_collected::U128
1496 FROM UNNEST(
1497 $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1498 $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1499 $16::TEXT[], $17::TEXT[]
1500 ) AS t(pool_address, owner, tick_lower, tick_upper,
1501 liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1502 tokens_owed_0, tokens_owed_1,
1503 total_amount0_deposited, total_amount1_deposited,
1504 total_amount0_collected, total_amount1_collected)
1505 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1506 DO NOTHING
1507 ",
1508 )
1509 .bind(chain_id as i32)
1510 .bind(snapshot_block as i64)
1511 .bind(snapshot_transaction_index as i32)
1512 .bind(snapshot_log_index as i32)
1513 .bind(&pool_addresses[..])
1514 .bind(&owners[..])
1515 .bind(&tick_lowers[..])
1516 .bind(&tick_uppers[..])
1517 .bind(&liquidities[..])
1518 .bind(&fee_growth_inside_0_lasts[..])
1519 .bind(&fee_growth_inside_1_lasts[..])
1520 .bind(&tokens_owed_0s[..])
1521 .bind(&tokens_owed_1s[..])
1522 .bind(&total_amount0_depositeds as &[Option<String>])
1523 .bind(&total_amount1_depositeds as &[Option<String>])
1524 .bind(&total_amount0_collecteds as &[Option<String>])
1525 .bind(&total_amount1_collecteds as &[Option<String>])
1526 .execute(&self.pool)
1527 .await
1528 .map(|_| ())
1529 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1530 }
1531
1532 pub async fn add_pool_ticks_batch(
1538 &self,
1539 chain_id: u32,
1540 snapshot_block: u64,
1541 snapshot_transaction_index: u32,
1542 snapshot_log_index: u32,
1543 ticks: &[(Address, &PoolTick)],
1544 ) -> anyhow::Result<()> {
1545 if ticks.is_empty() {
1546 return Ok(());
1547 }
1548
1549 let len = ticks.len();
1551 let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1552 let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1553 let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1554 let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1555 let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1556 let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1557 let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1558 let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1559
1560 for (pool_address, tick) in ticks {
1562 pool_addresses.push(pool_address.to_string());
1563 tick_values.push(tick.value);
1564 liquidity_grosses.push(tick.liquidity_gross.to_string());
1565 liquidity_nets.push(tick.liquidity_net.to_string());
1566 fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1567 fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1568 initializeds.push(tick.initialized);
1569 last_updated_blocks.push(tick.last_updated_block as i64);
1570 }
1571
1572 sqlx::query(
1574 r"
1575 INSERT INTO pool_tick (
1576 chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1577 tick_value, liquidity_gross, liquidity_net,
1578 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1579 )
1580 SELECT
1581 $1, pool_address, $2, $3, $4,
1582 tick_value, liquidity_gross::U128, liquidity_net::I128,
1583 fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1584 FROM UNNEST(
1585 $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1586 $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1587 ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1588 fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1589 ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1590 DO NOTHING
1591 ",
1592 )
1593 .bind(chain_id as i32)
1594 .bind(snapshot_block as i64)
1595 .bind(snapshot_transaction_index as i32)
1596 .bind(snapshot_log_index as i32)
1597 .bind(&pool_addresses[..])
1598 .bind(&tick_values[..])
1599 .bind(&liquidity_grosses[..])
1600 .bind(&liquidity_nets[..])
1601 .bind(&fee_growth_outside_0s[..])
1602 .bind(&fee_growth_outside_1s[..])
1603 .bind(&initializeds[..])
1604 .bind(&last_updated_blocks[..])
1605 .execute(&self.pool)
1606 .await
1607 .map(|_| ())
1608 .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1609 }
1610
1611 pub async fn update_pool_initial_price_tick(
1617 &self,
1618 chain_id: u32,
1619 initialize_event: &InitializeEvent,
1620 ) -> anyhow::Result<()> {
1621 sqlx::query(
1622 r"
1623 UPDATE pool
1624 SET
1625 initial_tick = $4,
1626 initial_sqrt_price_x96 = $5
1627 WHERE chain_id = $1
1628 AND dex_name = $2
1629 AND address = $3
1630 ",
1631 )
1632 .bind(chain_id as i32)
1633 .bind(initialize_event.dex.name.to_string())
1634 .bind(initialize_event.pool_address.to_string())
1635 .bind(initialize_event.tick)
1636 .bind(initialize_event.sqrt_price_x96.to_string())
1637 .execute(&self.pool)
1638 .await
1639 .map(|_| ())
1640 .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1641 }
1642
1643 pub async fn load_latest_valid_pool_snapshot(
1651 &self,
1652 chain_id: u32,
1653 pool_address: &Address,
1654 ) -> anyhow::Result<Option<PoolSnapshot>> {
1655 let result = sqlx::query(
1656 r"
1657 SELECT
1658 block, transaction_index, log_index, transaction_hash,
1659 current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1660 protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1661 fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1662 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1663 total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1664 total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1665 liquidity_utilization_rate,
1666 (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1667 FROM pool_snapshot
1668 WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1669 ORDER BY block DESC, transaction_index DESC, log_index DESC
1670 LIMIT 1
1671 ",
1672 )
1673 .bind(chain_id as i32)
1674 .bind(pool_address.to_string())
1675 .fetch_optional(&self.pool)
1676 .await
1677 .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1678
1679 if let Some(row) = result {
1680 let block: i64 = row.get("block");
1682 let transaction_index: i32 = row.get("transaction_index");
1683 let log_index: i32 = row.get("log_index");
1684 let transaction_hash: String = row.get("transaction_hash");
1685
1686 let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1687 block as u64,
1688 transaction_hash,
1689 transaction_index as u32,
1690 log_index as u32,
1691 );
1692
1693 let state = PoolState {
1694 current_tick: row.get("current_tick"),
1695 price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1696 liquidity: row.get::<String, _>("liquidity").parse()?,
1697 protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1698 protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1699 fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1700 fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1701 fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1702 };
1703
1704 let analytics = PoolAnalytics {
1705 total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1706 total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1707 total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1708 total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1709 total_swaps: row.get::<i32, _>("total_swaps") as u64,
1710 total_mints: row.get::<i32, _>("total_mints") as u64,
1711 total_burns: row.get::<i32, _>("total_burns") as u64,
1712 total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1713 total_flashes: row.get::<i32, _>("total_flashes") as u64,
1714 liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1715 };
1716
1717 let positions = self
1719 .load_pool_positions_for_snapshot(
1720 chain_id,
1721 pool_address,
1722 block as u64,
1723 transaction_index as u32,
1724 log_index as u32,
1725 )
1726 .await?;
1727
1728 let ticks = self
1729 .load_pool_ticks_for_snapshot(
1730 chain_id,
1731 pool_address,
1732 block as u64,
1733 transaction_index as u32,
1734 log_index as u32,
1735 )
1736 .await?;
1737
1738 let dex_name: String = row.get("dex_name");
1739 let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1740 .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1741
1742 let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1743 .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1744
1745 let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1746 .ok_or_else(|| {
1747 anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1748 })?;
1749
1750 let instrument_id =
1751 Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1752
1753 Ok(Some(PoolSnapshot::new(
1754 instrument_id,
1755 state,
1756 positions,
1757 ticks,
1758 analytics,
1759 block_position,
1760 )))
1761 } else {
1762 Ok(None)
1763 }
1764 }
1765
1766 pub async fn mark_pool_snapshot_valid(
1772 &self,
1773 chain_id: u32,
1774 pool_address: &Address,
1775 block: u64,
1776 transaction_index: u32,
1777 log_index: u32,
1778 ) -> anyhow::Result<()> {
1779 sqlx::query(
1780 r"
1781 UPDATE pool_snapshot
1782 SET is_valid = TRUE
1783 WHERE chain_id = $1
1784 AND pool_address = $2
1785 AND block = $3
1786 AND transaction_index = $4
1787 AND log_index = $5
1788 ",
1789 )
1790 .bind(chain_id as i32)
1791 .bind(pool_address.to_string())
1792 .bind(block as i64)
1793 .bind(transaction_index as i32)
1794 .bind(log_index as i32)
1795 .execute(&self.pool)
1796 .await
1797 .map(|_| ())
1798 .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1799 }
1800
1801 pub async fn load_pool_positions_for_snapshot(
1807 &self,
1808 chain_id: u32,
1809 pool_address: &Address,
1810 snapshot_block: u64,
1811 snapshot_transaction_index: u32,
1812 snapshot_log_index: u32,
1813 ) -> anyhow::Result<Vec<PoolPosition>> {
1814 let rows = sqlx::query(
1815 r"
1816 SELECT
1817 owner, tick_lower, tick_upper,
1818 liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1819 tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1820 total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1821 total_amount0_collected::TEXT, total_amount1_collected::TEXT
1822 FROM pool_position
1823 WHERE chain_id = $1
1824 AND pool_address = $2
1825 AND snapshot_block = $3
1826 AND snapshot_transaction_index = $4
1827 AND snapshot_log_index = $5
1828 ",
1829 )
1830 .bind(chain_id as i32)
1831 .bind(pool_address.to_string())
1832 .bind(snapshot_block as i64)
1833 .bind(snapshot_transaction_index as i32)
1834 .bind(snapshot_log_index as i32)
1835 .fetch_all(&self.pool)
1836 .await
1837 .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1838
1839 rows.iter()
1840 .map(|row| {
1841 let owner: String = row.get("owner");
1842 let position = PoolPosition {
1843 owner: validate_address(&owner)?,
1844 tick_lower: row.get("tick_lower"),
1845 tick_upper: row.get("tick_upper"),
1846 liquidity: row.get::<String, _>("liquidity").parse()?,
1847 fee_growth_inside_0_last: row
1848 .get::<String, _>("fee_growth_inside_0_last")
1849 .parse()?,
1850 fee_growth_inside_1_last: row
1851 .get::<String, _>("fee_growth_inside_1_last")
1852 .parse()?,
1853 tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1854 tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1855 total_amount0_deposited: row
1856 .get::<String, _>("total_amount0_deposited")
1857 .parse()?,
1858 total_amount1_deposited: row
1859 .get::<String, _>("total_amount1_deposited")
1860 .parse()?,
1861 total_amount0_collected: row
1862 .get::<String, _>("total_amount0_collected")
1863 .parse()?,
1864 total_amount1_collected: row
1865 .get::<String, _>("total_amount1_collected")
1866 .parse()?,
1867 };
1868 Ok(position)
1869 })
1870 .collect()
1871 }
1872
1873 pub async fn load_pool_ticks_for_snapshot(
1879 &self,
1880 chain_id: u32,
1881 pool_address: &Address,
1882 snapshot_block: u64,
1883 snapshot_transaction_index: u32,
1884 snapshot_log_index: u32,
1885 ) -> anyhow::Result<Vec<PoolTick>> {
1886 let rows = sqlx::query(
1887 r"
1888 SELECT
1889 tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1890 fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1891 last_updated_block
1892 FROM pool_tick
1893 WHERE chain_id = $1
1894 AND pool_address = $2
1895 AND snapshot_block = $3
1896 AND snapshot_transaction_index = $4
1897 AND snapshot_log_index = $5
1898 ",
1899 )
1900 .bind(chain_id as i32)
1901 .bind(pool_address.to_string())
1902 .bind(snapshot_block as i64)
1903 .bind(snapshot_transaction_index as i32)
1904 .bind(snapshot_log_index as i32)
1905 .fetch_all(&self.pool)
1906 .await
1907 .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1908
1909 rows.iter()
1910 .map(|row| {
1911 let tick = PoolTick::new(
1912 row.get("tick_value"),
1913 row.get::<String, _>("liquidity_gross").parse()?,
1914 row.get::<String, _>("liquidity_net").parse()?,
1915 row.get::<String, _>("fee_growth_outside_0").parse()?,
1916 row.get::<String, _>("fee_growth_outside_1").parse()?,
1917 row.get("initialized"),
1918 row.get::<i64, _>("last_updated_block") as u64,
1919 );
1920 Ok(tick)
1921 })
1922 .collect()
1923 }
1924
1925 pub fn stream_pool_events<'a>(
1938 &'a self,
1939 chain: SharedChain,
1940 dex: SharedDex,
1941 instrument_id: InstrumentId,
1942 pool_address: &Address,
1943 from_position: Option<BlockPosition>,
1944 ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1945 const QUERY_ALL: &str = r"
1947 (SELECT
1948 'swap' as event_type,
1949 chain_id,
1950 pool_address,
1951 block,
1952 transaction_hash,
1953 transaction_index,
1954 log_index,
1955 sender,
1956 recipient,
1957 NULL::TEXT as owner,
1958 sqrt_price_x96::TEXT,
1959 liquidity::TEXT as swap_liquidity,
1960 tick as swap_tick,
1961 amount0::TEXT as swap_amount0,
1962 amount1::TEXT as swap_amount1,
1963 NULL::TEXT as position_liquidity,
1964 NULL::TEXT as amount0,
1965 NULL::TEXT as amount1,
1966 NULL::INT as tick_lower,
1967 NULL::INT as tick_upper,
1968 NULL::TEXT as liquidity_event_type,
1969 NULL::TEXT as flash_amount0,
1970 NULL::TEXT as flash_amount1,
1971 NULL::TEXT as flash_paid0,
1972 NULL::TEXT as flash_paid1
1973 FROM pool_swap_event
1974 WHERE chain_id = $1 AND pool_address = $2)
1975 UNION ALL
1976 (SELECT
1977 'liquidity' as event_type,
1978 chain_id,
1979 pool_address,
1980 block,
1981 transaction_hash,
1982 transaction_index,
1983 log_index,
1984 sender,
1985 NULL::TEXT as recipient,
1986 owner,
1987 NULL::text as sqrt_price_x96,
1988 NULL::TEXT as swap_liquidity,
1989 NULL::INT as swap_tick,
1990 amount0::TEXT as swap_amount0,
1991 amount1::TEXT as swap_amount1,
1992 position_liquidity::TEXT,
1993 amount0::TEXT,
1994 amount1::TEXT,
1995 tick_lower::INT,
1996 tick_upper::INT,
1997 event_type as liquidity_event_type,
1998 NULL::TEXT as flash_amount0,
1999 NULL::TEXT as flash_amount1,
2000 NULL::TEXT as flash_paid0,
2001 NULL::TEXT as flash_paid1
2002 FROM pool_liquidity_event
2003 WHERE chain_id = $1 AND pool_address = $2)
2004 UNION ALL
2005 (SELECT
2006 'collect' as event_type,
2007 chain_id,
2008 pool_address,
2009 block,
2010 transaction_hash,
2011 transaction_index,
2012 log_index,
2013 NULL::TEXT as sender,
2014 NULL::TEXT as recipient,
2015 owner,
2016 NULL::TEXT as sqrt_price_x96,
2017 NULL::TEXT as swap_liquidity,
2018 NULL::INT AS swap_tick,
2019 amount0::TEXT as swap_amount0,
2020 amount1::TEXT as swap_amount1,
2021 NULL::TEXT as position_liquidity,
2022 amount0::TEXT,
2023 amount1::TEXT,
2024 tick_lower::INT,
2025 tick_upper::INT,
2026 NULL::TEXT as liquidity_event_type,
2027 NULL::TEXT as flash_amount0,
2028 NULL::TEXT as flash_amount1,
2029 NULL::TEXT as flash_paid0,
2030 NULL::TEXT as flash_paid1
2031 FROM pool_collect_event
2032 WHERE chain_id = $1 AND pool_address = $2)
2033 UNION ALL
2034 (SELECT
2035 'flash' as event_type,
2036 chain_id,
2037 pool_address,
2038 block,
2039 transaction_hash,
2040 transaction_index,
2041 log_index,
2042 sender,
2043 recipient,
2044 NULL::TEXT as owner,
2045 NULL::TEXT as sqrt_price_x96,
2046 NULL::TEXT as swap_liquidity,
2047 NULL::INT AS swap_tick,
2048 NULL::TEXT as swap_amount0,
2049 NULL::TEXT as swap_amount1,
2050 NULL::TEXT as position_liquidity,
2051 NULL::TEXT as amount0,
2052 NULL::TEXT as amount1,
2053 NULL::INT as tick_lower,
2054 NULL::INT as tick_upper,
2055 NULL::TEXT as liquidity_event_type,
2056 amount0::TEXT as flash_amount0,
2057 amount1::TEXT as flash_amount1,
2058 paid0::TEXT as flash_paid0,
2059 paid1::TEXT as flash_paid1
2060 FROM pool_flash_event
2061 WHERE chain_id = $1 AND pool_address = $2)
2062 ORDER BY block, transaction_index, log_index";
2063
2064 const QUERY_FROM_POSITION: &str = r"
2066 (SELECT
2067 'swap' as event_type,
2068 chain_id,
2069 pool_address,
2070 block,
2071 transaction_hash,
2072 transaction_index,
2073 log_index,
2074 sender,
2075 recipient,
2076 NULL::TEXT as owner,
2077 sqrt_price_x96::TEXT,
2078 liquidity::TEXT as swap_liquidity,
2079 tick as swap_tick,
2080 amount0::TEXT as swap_amount0,
2081 amount1::TEXT as swap_amount1,
2082 NULL::TEXT as position_liquidity,
2083 NULL::TEXT as amount0,
2084 NULL::TEXT as amount1,
2085 NULL::INT as tick_lower,
2086 NULL::INT as tick_upper,
2087 NULL::TEXT as liquidity_event_type,
2088 NULL::TEXT as flash_amount0,
2089 NULL::TEXT as flash_amount1,
2090 NULL::TEXT as flash_paid0,
2091 NULL::TEXT as flash_paid1
2092 FROM pool_swap_event
2093 WHERE chain_id = $1 AND pool_address = $2
2094 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2095 UNION ALL
2096 (SELECT
2097 'liquidity' as event_type,
2098 chain_id,
2099 pool_address,
2100 block,
2101 transaction_hash,
2102 transaction_index,
2103 log_index,
2104 sender,
2105 NULL::TEXT as recipient,
2106 owner,
2107 NULL::text as sqrt_price_x96,
2108 NULL::TEXT as swap_liquidity,
2109 NULL::INT as swap_tick,
2110 amount0::TEXT as swap_amount0,
2111 amount1::TEXT as swap_amount1,
2112 position_liquidity::TEXT,
2113 amount0::TEXT,
2114 amount1::TEXT,
2115 tick_lower::INT,
2116 tick_upper::INT,
2117 event_type as liquidity_event_type,
2118 NULL::TEXT as flash_amount0,
2119 NULL::TEXT as flash_amount1,
2120 NULL::TEXT as flash_paid0,
2121 NULL::TEXT as flash_paid1
2122 FROM pool_liquidity_event
2123 WHERE chain_id = $1 AND pool_address = $2
2124 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2125 UNION ALL
2126 (SELECT
2127 'collect' as event_type,
2128 chain_id,
2129 pool_address,
2130 block,
2131 transaction_hash,
2132 transaction_index,
2133 log_index,
2134 NULL::TEXT as sender,
2135 NULL::TEXT as recipient,
2136 owner,
2137 NULL::TEXT as sqrt_price_x96,
2138 NULL::TEXT as swap_liquidity,
2139 NULL::INT AS swap_tick,
2140 amount0::TEXT as swap_amount0,
2141 amount1::TEXT as swap_amount1,
2142 NULL::TEXT as position_liquidity,
2143 amount0::TEXT,
2144 amount1::TEXT,
2145 tick_lower::INT,
2146 tick_upper::INT,
2147 NULL::TEXT as liquidity_event_type,
2148 NULL::TEXT as flash_amount0,
2149 NULL::TEXT as flash_amount1,
2150 NULL::TEXT as flash_paid0,
2151 NULL::TEXT as flash_paid1
2152 FROM pool_collect_event
2153 WHERE chain_id = $1 AND pool_address = $2
2154 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2155 UNION ALL
2156 (SELECT
2157 'flash' as event_type,
2158 chain_id,
2159 pool_address,
2160 block,
2161 transaction_hash,
2162 transaction_index,
2163 log_index,
2164 sender,
2165 recipient,
2166 NULL::TEXT as owner,
2167 NULL::TEXT as sqrt_price_x96,
2168 NULL::TEXT as swap_liquidity,
2169 NULL::INT AS swap_tick,
2170 NULL::TEXT as swap_amount0,
2171 NULL::TEXT as swap_amount1,
2172 NULL::TEXT as position_liquidity,
2173 NULL::TEXT as amount0,
2174 NULL::TEXT as amount1,
2175 NULL::INT as tick_lower,
2176 NULL::INT as tick_upper,
2177 NULL::TEXT as liquidity_event_type,
2178 amount0::TEXT as flash_amount0,
2179 amount1::TEXT as flash_amount1,
2180 paid0::TEXT as flash_paid0,
2181 paid1::TEXT as flash_paid1
2182 FROM pool_flash_event
2183 WHERE chain_id = $1 AND pool_address = $2
2184 AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2185 ORDER BY block, transaction_index, log_index";
2186
2187 let query = if let Some(pos) = from_position {
2189 sqlx::query(QUERY_FROM_POSITION)
2190 .bind(chain.chain_id as i32)
2191 .bind(pool_address.to_string())
2192 .bind(pos.number as i64)
2193 .bind(pos.transaction_index as i32)
2194 .bind(pos.log_index as i32)
2195 .fetch(&self.pool)
2196 } else {
2197 sqlx::query(QUERY_ALL)
2198 .bind(chain.chain_id as i32)
2199 .bind(pool_address.to_string())
2200 .fetch(&self.pool)
2201 };
2202
2203 let stream = query.map(move |row_result| match row_result {
2205 Ok(row) => {
2206 transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2207 .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2208 }
2209 Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2210 });
2211
2212 Box::pin(stream)
2213 }
2214}