nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21    defi::{
22        Block, Chain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain,
23        SharedDex, Token,
24        data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25        pool_analysis::{
26            position::PoolPosition,
27            snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
28        },
29        tick_map::tick::PoolTick,
30        validation::validate_address,
31    },
32    identifiers::InstrumentId,
33};
34use rust_decimal::Decimal;
35use sqlx::{PgPool, Row, postgres::PgConnectOptions};
36
37use crate::{
38    cache::{
39        consistency::CachedBlocksConsistencyStatus,
40        copy::PostgresCopyHandler,
41        rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
42        types::{U128Pg, U256Pg},
43    },
44    events::initialize::InitializeEvent,
45};
46
47/// Database interface for persisting and retrieving blockchain entities and domain objects.
48#[derive(Debug)]
49pub struct BlockchainCacheDatabase {
50    /// PostgreSQL connection pool used for database operations.
51    pool: PgPool,
52}
53
54impl BlockchainCacheDatabase {
55    /// Initializes a new database instance by establishing a connection to PostgreSQL.
56    ///
57    /// # Panics
58    ///
59    /// Panics if unable to connect to PostgreSQL with the provided options.
60    pub async fn init(pg_options: PgConnectOptions) -> Self {
61        let pool = sqlx::postgres::PgPoolOptions::new()
62            .max_connections(32) // Increased from default 10
63            .min_connections(5) // Keep some connections warm
64            .acquire_timeout(std::time::Duration::from_secs(3))
65            .connect_with(pg_options)
66            .await
67            .expect("Error connecting to Postgres");
68        Self { pool }
69    }
70
71    /// Seeds the database with a blockchain chain record.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the database operation fails.
76    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
77        sqlx::query(
78            r"
79            INSERT INTO chain (
80                chain_id, name
81            ) VALUES ($1,$2)
82            ON CONFLICT (chain_id)
83            DO NOTHING
84        ",
85        )
86        .bind(chain.chain_id as i32)
87        .bind(chain.name.to_string())
88        .execute(&self.pool)
89        .await
90        .map(|_| ())
91        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
92    }
93
94    /// Creates a table partition for the block table specific to the given chain
95    /// by calling the existing PostgreSQL function `create_block_partition`.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the database operation fails.
100    pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
101        let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
102            .bind(chain.chain_id as i32)
103            .fetch_one(&self.pool)
104            .await
105            .map_err(|e| {
106                anyhow::anyhow!(
107                    "Failed to call create_block_partition for chain {}: {e}",
108                    chain.chain_id
109                )
110            })?;
111
112        Ok(result.0)
113    }
114
115    /// Creates a table partition for the token table specific to the given chain
116    /// by calling the existing PostgreSQL function `create_token_partition`.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the database operation fails.
121    pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
122        let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
123            .bind(chain.chain_id as i32)
124            .fetch_one(&self.pool)
125            .await
126            .map_err(|e| {
127                anyhow::anyhow!(
128                    "Failed to call create_token_partition for chain {}: {e}",
129                    chain.chain_id
130                )
131            })?;
132
133        Ok(result.0)
134    }
135
136    /// Returns the highest block number that maintains data continuity in the database.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the database query fails.
141    pub async fn get_block_consistency_status(
142        &self,
143        chain: &Chain,
144    ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
145        tracing::info!("Fetching block consistency status");
146
147        let result: (i64, i64) = sqlx::query_as(
148            r"
149            SELECT
150                COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
151                get_last_continuous_block($1) as last_continuous_block
152            "
153        )
154        .bind(chain.chain_id as i32)
155        .fetch_one(&self.pool)
156        .await
157        .map_err(|e| {
158            anyhow::anyhow!(
159                "Failed to get block info for chain {}: {}",
160                chain.chain_id,
161                e
162            )
163        })?;
164
165        Ok(CachedBlocksConsistencyStatus::new(
166            result.0 as u64,
167            result.1 as u64,
168        ))
169    }
170
171    /// Inserts or updates a block record in the database.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the database operation fails.
176    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
177        sqlx::query(
178            r"
179            INSERT INTO block (
180                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
181                base_fee_per_gas, blob_gas_used, excess_blob_gas,
182                l1_gas_price, l1_gas_used, l1_fee_scalar
183            ) VALUES (
184                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
185            )
186            ON CONFLICT (chain_id, number)
187            DO UPDATE
188            SET
189                hash = $3,
190                parent_hash = $4,
191                miner = $5,
192                gas_limit = $6,
193                gas_used = $7,
194                timestamp = $8,
195                base_fee_per_gas = $9,
196                blob_gas_used = $10,
197                excess_blob_gas = $11,
198                l1_gas_price = $12,
199                l1_gas_used = $13,
200                l1_fee_scalar = $14
201        ",
202        )
203        .bind(chain_id as i32)
204        .bind(block.number as i64)
205        .bind(block.hash.as_str())
206        .bind(block.parent_hash.as_str())
207        .bind(block.miner.as_str())
208        .bind(block.gas_limit as i64)
209        .bind(block.gas_used as i64)
210        .bind(block.timestamp.to_string())
211        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
212        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
213        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
214        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
215        .bind(block.l1_gas_used.map(|v| v as i64))
216        .bind(block.l1_fee_scalar.map(|v| v as i64))
217        .execute(&self.pool)
218        .await
219        .map(|_| ())
220        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
221    }
222
223    /// Inserts multiple blocks in a single database operation using UNNEST for optimal performance.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the database operation fails.
228    pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
229        if blocks.is_empty() {
230            return Ok(());
231        }
232
233        // Prepare vectors for each column
234        let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
235        let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
236        let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
237        let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
238        let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
239        let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
240        let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
241        let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242        let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243        let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
244        let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
245        let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
246        let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
247
248        // Fill vectors from blocks
249        for block in blocks {
250            numbers.push(block.number as i64);
251            hashes.push(block.hash.clone());
252            parent_hashes.push(block.parent_hash.clone());
253            miners.push(block.miner.to_string());
254            gas_limits.push(block.gas_limit as i64);
255            gas_useds.push(block.gas_used as i64);
256            timestamps.push(block.timestamp.to_string());
257            base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
258            blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
259            excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
260            l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
261            l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
262            l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
263        }
264
265        // Execute batch insert with UNNEST
266        sqlx::query(
267            r"
268            INSERT INTO block (
269                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
270                base_fee_per_gas, blob_gas_used, excess_blob_gas,
271                l1_gas_price, l1_gas_used, l1_fee_scalar
272            )
273            SELECT
274                $1, *
275            FROM UNNEST(
276                $2::int8[], $3::text[], $4::text[], $5::text[],
277                $6::int8[], $7::int8[], $8::text[],
278                $9::text[], $10::text[], $11::text[],
279                $12::text[], $13::int8[], $14::int8[]
280            )
281            ON CONFLICT (chain_id, number) DO NOTHING
282           ",
283        )
284        .bind(chain_id as i32)
285        .bind(&numbers[..])
286        .bind(&hashes[..])
287        .bind(&parent_hashes[..])
288        .bind(&miners[..])
289        .bind(&gas_limits[..])
290        .bind(&gas_useds[..])
291        .bind(&timestamps[..])
292        .bind(&base_fee_per_gases as &[Option<String>])
293        .bind(&blob_gas_useds as &[Option<String>])
294        .bind(&excess_blob_gases as &[Option<String>])
295        .bind(&l1_gas_prices as &[Option<String>])
296        .bind(&l1_gas_useds as &[Option<i64>])
297        .bind(&l1_fee_scalars as &[Option<i64>])
298        .execute(&self.pool)
299        .await
300        .map(|_| ())
301        .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
302    }
303
304    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
305    ///
306    /// This method is significantly faster than INSERT for bulk operations as it bypasses
307    /// SQL parsing and uses PostgreSQL's native binary protocol.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the COPY operation fails.
312    pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
313        let copy_handler = PostgresCopyHandler::new(&self.pool);
314        copy_handler.copy_blocks(chain_id, blocks).await
315    }
316
317    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the COPY operation fails.
322    pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
323        let copy_handler = PostgresCopyHandler::new(&self.pool);
324        copy_handler.copy_tokens(chain_id, tokens).await
325    }
326
327    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the COPY operation fails.
332    pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
333        let copy_handler = PostgresCopyHandler::new(&self.pool);
334        copy_handler.copy_pools(chain_id, pools).await
335    }
336
337    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
338    ///
339    /// This method is significantly faster than INSERT for bulk operations as it bypasses
340    /// SQL parsing and uses PostgreSQL's native binary protocol.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the COPY operation fails.
345    pub async fn add_pool_swaps_copy(
346        &self,
347        chain_id: u32,
348        swaps: &[PoolSwap],
349    ) -> anyhow::Result<()> {
350        let copy_handler = PostgresCopyHandler::new(&self.pool);
351        copy_handler.copy_pool_swaps(chain_id, swaps).await
352    }
353
354    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
355    ///
356    /// This method is significantly faster than INSERT for bulk operations as it bypasses
357    /// SQL parsing and uses PostgreSQL's native binary protocol.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the COPY operation fails.
362    pub async fn add_pool_liquidity_updates_copy(
363        &self,
364        chain_id: u32,
365        updates: &[PoolLiquidityUpdate],
366    ) -> anyhow::Result<()> {
367        let copy_handler = PostgresCopyHandler::new(&self.pool);
368        copy_handler
369            .copy_pool_liquidity_updates(chain_id, updates)
370            .await
371    }
372
373    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
374    ///
375    /// This method is significantly faster than INSERT for bulk operations as it bypasses
376    /// SQL parsing and most database validation checks.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the COPY operation fails.
381    pub async fn copy_pool_fee_collects_batch(
382        &self,
383        chain_id: u32,
384        collects: &[PoolFeeCollect],
385    ) -> anyhow::Result<()> {
386        let copy_handler = PostgresCopyHandler::new(&self.pool);
387        copy_handler.copy_pool_collects(chain_id, collects).await
388    }
389
390    /// Retrieves block timestamps for a given chain starting from a specific block number.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the database query fails.
395    pub async fn load_block_timestamps(
396        &self,
397        chain: SharedChain,
398        from_block: u64,
399    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
400        sqlx::query_as::<_, BlockTimestampRow>(
401            r"
402            SELECT
403                number,
404                timestamp
405            FROM block
406            WHERE chain_id = $1 AND number >= $2
407            ORDER BY number ASC
408            ",
409        )
410        .bind(chain.chain_id as i32)
411        .bind(from_block as i64)
412        .fetch_all(&self.pool)
413        .await
414        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
415    }
416
417    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the database operation fails.
422    pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
423        sqlx::query(
424            r"
425            INSERT INTO dex (
426                chain_id, name, factory_address, creation_block
427            ) VALUES ($1, $2, $3, $4)
428            ON CONFLICT (chain_id, name)
429            DO UPDATE
430            SET
431                factory_address = $3,
432                creation_block = $4
433        ",
434        )
435        .bind(dex.chain.chain_id as i32)
436        .bind(dex.name.to_string())
437        .bind(dex.factory.to_string())
438        .bind(dex.factory_creation_block as i64)
439        .execute(&self.pool)
440        .await
441        .map(|_| ())
442        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
443    }
444
445    /// Adds or updates a liquidity pool/pair record in the database.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the database operation fails.
450    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
451        sqlx::query(
452            r"
453            INSERT INTO pool (
454                chain_id, address, pool_identifier, dex_name, creation_block,
455                token0_chain, token0_address,
456                token1_chain, token1_address,
457                fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
458            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
459            ON CONFLICT (chain_id, dex_name, pool_identifier)
460            DO UPDATE
461            SET
462                address = $2,
463                creation_block = $5,
464                token0_chain = $6,
465                token0_address = $7,
466                token1_chain = $8,
467                token1_address = $9,
468                fee = $10,
469                tick_spacing = $11,
470                initial_tick = $12,
471                initial_sqrt_price_x96 = $13,
472                hook_address = $14
473        ",
474        )
475        .bind(pool.chain.chain_id as i32)
476        .bind(pool.address.to_string())
477        .bind(pool.pool_identifier.as_ref())
478        .bind(pool.dex.name.to_string())
479        .bind(pool.creation_block as i64)
480        .bind(pool.token0.chain.chain_id as i32)
481        .bind(pool.token0.address.to_string())
482        .bind(pool.token1.chain.chain_id as i32)
483        .bind(pool.token1.address.to_string())
484        .bind(pool.fee.map(|fee| fee as i32))
485        .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
486        .bind(pool.initial_tick)
487        .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
488        .bind(pool.hooks.as_ref().map(|h| h.to_string()))
489        .execute(&self.pool)
490        .await
491        .map(|_| ())
492        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
493    }
494
495    /// Inserts multiple pools in a single database operation using UNNEST for optimal performance.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the database operation fails.
500    pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
501        if pools.is_empty() {
502            return Ok(());
503        }
504
505        // Prepare vectors for each column
506        let len = pools.len();
507        let mut addresses: Vec<String> = Vec::with_capacity(len);
508        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
509        let mut dex_names: Vec<String> = Vec::with_capacity(len);
510        let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
511        let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
512        let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
513        let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
514        let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
515        let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
516        let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
517        let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
518        let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
519        let mut hook_addresses: Vec<Option<String>> = Vec::with_capacity(len);
520        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
521
522        // Fill vectors from pools
523        for pool in pools {
524            chain_ids.push(pool.chain.chain_id as i32);
525            addresses.push(pool.address.to_string());
526            pool_identifiers.push(pool.pool_identifier.to_string());
527            dex_names.push(pool.dex.name.to_string());
528            creation_blocks.push(pool.creation_block as i64);
529            token0_chains.push(pool.token0.chain.chain_id as i32);
530            token0_addresses.push(pool.token0.address.to_string());
531            token1_chains.push(pool.token1.chain.chain_id as i32);
532            token1_addresses.push(pool.token1.address.to_string());
533            fees.push(pool.fee.map(|fee| fee as i32));
534            tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
535            initial_ticks.push(pool.initial_tick);
536            initial_sqrt_price_x96s
537                .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
538            hook_addresses.push(pool.hooks.as_ref().map(|h| h.to_string()));
539        }
540
541        // Execute batch insert with UNNEST
542        sqlx::query(
543            r"
544            INSERT INTO pool (
545                chain_id, address, pool_identifier, dex_name, creation_block,
546                token0_chain, token0_address,
547                token1_chain, token1_address,
548                fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
549            )
550            SELECT *
551            FROM UNNEST(
552                $1::int4[], $2::text[], $3::text[], $4::text[], $5::int8[],
553                $6::int4[], $7::text[], $8::int4[], $9::text[],
554                $10::int4[], $11::int4[], $12::int4[], $13::text[], $14::text[]
555            )
556            ON CONFLICT (chain_id, dex_name, pool_identifier) DO NOTHING
557           ",
558        )
559        .bind(&chain_ids[..])
560        .bind(&addresses[..])
561        .bind(&pool_identifiers[..])
562        .bind(&dex_names[..])
563        .bind(&creation_blocks[..])
564        .bind(&token0_chains[..])
565        .bind(&token0_addresses[..])
566        .bind(&token1_chains[..])
567        .bind(&token1_addresses[..])
568        .bind(&fees[..])
569        .bind(&tick_spacings[..])
570        .bind(&initial_ticks[..])
571        .bind(&initial_sqrt_price_x96s[..])
572        .bind(&hook_addresses as &[Option<String>])
573        .execute(&self.pool)
574        .await
575        .map(|_| ())
576        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
577    }
578
579    /// Inserts multiple pool swaps in a single database operation using UNNEST for optimal performance.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if the database operation fails.
584    pub async fn add_pool_swaps_batch(
585        &self,
586        chain_id: u32,
587        swaps: &[PoolSwap],
588    ) -> anyhow::Result<()> {
589        if swaps.is_empty() {
590            return Ok(());
591        }
592
593        // Prepare vectors for each column
594        let len = swaps.len();
595        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
596        let mut dex_names: Vec<String> = Vec::with_capacity(len);
597        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
598        let mut blocks: Vec<i64> = Vec::with_capacity(len);
599        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
600        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
601        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
602        let mut senders: Vec<String> = Vec::with_capacity(len);
603        let mut recipients: Vec<String> = Vec::with_capacity(len);
604        let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
605        let mut liquidities: Vec<String> = Vec::with_capacity(len);
606        let mut ticks: Vec<i32> = Vec::with_capacity(len);
607        let mut amount0s: Vec<String> = Vec::with_capacity(len);
608        let mut amount1s: Vec<String> = Vec::with_capacity(len);
609        let mut order_sides: Vec<Option<String>> = Vec::with_capacity(len);
610        let mut base_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
611        let mut quote_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
612        let mut spot_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
613        let mut execution_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
614
615        // Fill vectors from swaps
616        for swap in swaps {
617            chain_ids.push(chain_id as i32);
618            dex_names.push(swap.dex.name.to_string());
619            pool_identifiers.push(swap.pool_identifier.to_string());
620            blocks.push(swap.block as i64);
621            transaction_hashes.push(swap.transaction_hash.clone());
622            transaction_indices.push(swap.transaction_index as i32);
623            log_indices.push(swap.log_index as i32);
624            senders.push(swap.sender.to_string());
625            recipients.push(swap.recipient.to_string());
626            sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
627            liquidities.push(swap.liquidity.to_string());
628            ticks.push(swap.tick);
629            amount0s.push(swap.amount0.to_string());
630            amount1s.push(swap.amount1.to_string());
631
632            // Extract trade_info fields if available
633            if let Some(ref trade_info) = swap.trade_info {
634                order_sides.push(Some(trade_info.order_side.to_string()));
635                base_quantities.push(Some(trade_info.quantity_base.as_decimal()));
636                quote_quantities.push(Some(trade_info.quantity_quote.as_decimal()));
637                spot_prices.push(Some(trade_info.spot_price.as_decimal()));
638                execution_prices.push(Some(trade_info.execution_price.as_decimal()));
639            } else {
640                order_sides.push(None);
641                base_quantities.push(None);
642                quote_quantities.push(None);
643                spot_prices.push(None);
644                execution_prices.push(None);
645            }
646        }
647
648        // Execute batch insert with UNNEST
649        sqlx::query(
650            r"
651            INSERT INTO pool_swap_event (
652                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
653                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
654                order_side, base_quantity, quote_quantity, spot_price, execution_price
655            )
656            SELECT
657                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index, log_index, sender, recipient,
658                sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256,
659                order_side, base_quantity, quote_quantity, spot_price, execution_price
660            FROM UNNEST(
661                $1::INT[], $2::TEXT[], $3::TEXT[], $4::BIGINT[], $5::TEXT[], $6::INT[], $7::INT[],
662                $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::INT[], $13::TEXT[], $14::TEXT[],
663                $15::TEXT[], $16, $17, $18, $19
664            ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
665                   log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
666                   order_side, base_quantity, quote_quantity, spot_price, execution_price)
667            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
668           ",
669        )
670        .bind(&chain_ids[..])
671        .bind(&dex_names[..])
672        .bind(&pool_identifiers[..])
673        .bind(&blocks[..])
674        .bind(&transaction_hashes[..])
675        .bind(&transaction_indices[..])
676        .bind(&log_indices[..])
677        .bind(&senders[..])
678        .bind(&recipients[..])
679        .bind(&sqrt_price_x96s[..])
680        .bind(&liquidities[..])
681        .bind(&ticks[..])
682        .bind(&amount0s[..])
683        .bind(&amount1s[..])
684        .bind(&order_sides[..])
685        .bind(&base_quantities[..])
686        .bind(&quote_quantities[..])
687        .bind(&spot_prices[..])
688        .bind(&execution_prices[..])
689        .execute(&self.pool)
690        .await
691        .map(|_| ())
692        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
693    }
694
695    /// Inserts multiple pool liquidity updates in a single database operation using UNNEST for optimal performance.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the database operation fails.
700    pub async fn add_pool_liquidity_updates_batch(
701        &self,
702        chain_id: u32,
703        updates: &[PoolLiquidityUpdate],
704    ) -> anyhow::Result<()> {
705        if updates.is_empty() {
706            return Ok(());
707        }
708
709        // Prepare vectors for each column
710        let len = updates.len();
711        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
712        let mut dex_names: Vec<String> = Vec::with_capacity(len);
713        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
714        let mut blocks: Vec<i64> = Vec::with_capacity(len);
715        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
716        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
717        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
718        let mut event_types: Vec<String> = Vec::with_capacity(len);
719        let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
720        let mut owners: Vec<String> = Vec::with_capacity(len);
721        let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
722        let mut amount0s: Vec<String> = Vec::with_capacity(len);
723        let mut amount1s: Vec<String> = Vec::with_capacity(len);
724        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
725        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
726
727        // Fill vectors from updates
728        for update in updates {
729            chain_ids.push(chain_id as i32);
730            dex_names.push(update.dex.name.to_string());
731            pool_identifiers.push(update.pool_identifier.to_string());
732            blocks.push(update.block as i64);
733            transaction_hashes.push(update.transaction_hash.clone());
734            transaction_indices.push(update.transaction_index as i32);
735            log_indices.push(update.log_index as i32);
736            event_types.push(update.kind.to_string());
737            senders.push(update.sender.map(|s| s.to_string()));
738            owners.push(update.owner.to_string());
739            position_liquidities.push(update.position_liquidity.to_string());
740            amount0s.push(update.amount0.to_string());
741            amount1s.push(update.amount1.to_string());
742            tick_lowers.push(update.tick_lower);
743            tick_uppers.push(update.tick_upper);
744        }
745
746        // Execute batch insert with UNNEST
747        sqlx::query(
748            r"
749            INSERT INTO pool_liquidity_event (
750                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
751                log_index, event_type, sender, owner, position_liquidity,
752                amount0, amount1, tick_lower, tick_upper
753            )
754            SELECT
755                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
756                log_index, event_type, sender, owner, position_liquidity::u128,
757                amount0::U256, amount1::U256, tick_lower, tick_upper
758            FROM UNNEST(
759                $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
760                $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
761                $12::TEXT[], $13::TEXT[], $14::INT[], $15::INT[]
762            ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
763                   log_index, event_type, sender, owner, position_liquidity,
764                   amount0, amount1, tick_lower, tick_upper)
765            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
766           ",
767        )
768        .bind(&chain_ids[..])
769        .bind(&dex_names[..])
770        .bind(&pool_identifiers[..])
771        .bind(&blocks[..])
772        .bind(&transaction_hashes[..])
773        .bind(&transaction_indices[..])
774        .bind(&log_indices[..])
775        .bind(&event_types[..])
776        .bind(&senders[..])
777        .bind(&owners[..])
778        .bind(&position_liquidities[..])
779        .bind(&amount0s[..])
780        .bind(&amount1s[..])
781        .bind(&tick_lowers[..])
782        .bind(&tick_uppers[..])
783        .execute(&self.pool)
784        .await
785        .map(|_| ())
786        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
787    }
788
789    /// Adds or updates a token record in the database.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the database operation fails.
794    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
795        sqlx::query(
796            r"
797            INSERT INTO token (
798                chain_id, address, name, symbol, decimals
799            ) VALUES ($1, $2, $3, $4, $5)
800            ON CONFLICT (chain_id, address)
801            DO UPDATE
802            SET
803                name = $3,
804                symbol = $4,
805                decimals = $5
806        ",
807        )
808        .bind(token.chain.chain_id as i32)
809        .bind(token.address.to_string())
810        .bind(token.name.as_str())
811        .bind(token.symbol.as_str())
812        .bind(i32::from(token.decimals))
813        .execute(&self.pool)
814        .await
815        .map(|_| ())
816        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
817    }
818
819    /// Records an invalid token address with associated error information.
820    ///
821    /// # Errors
822    ///
823    /// Returns an error if the database insertion fails.
824    pub async fn add_invalid_token(
825        &self,
826        chain_id: u32,
827        address: &Address,
828        error_string: &str,
829    ) -> anyhow::Result<()> {
830        sqlx::query(
831            r"
832            INSERT INTO token (
833                chain_id, address, error
834            ) VALUES ($1, $2, $3)
835            ON CONFLICT (chain_id, address)
836            DO NOTHING;
837        ",
838        )
839        .bind(chain_id as i32)
840        .bind(address.to_string())
841        .bind(error_string)
842        .execute(&self.pool)
843        .await
844        .map(|_| ())
845        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
846    }
847
848    /// Persists a token swap transaction event to the `pool_swap` table.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if the database operation fails.
853    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
854        // Extract trade_info fields if available
855        let (order_side, base_quantity, quote_quantity, spot_price, execution_price) =
856            if let Some(ref trade_info) = swap.trade_info {
857                (
858                    Some(trade_info.order_side.to_string()),
859                    Some(trade_info.quantity_base.as_decimal()),
860                    Some(trade_info.quantity_quote.as_decimal()),
861                    Some(trade_info.spot_price.as_decimal()),
862                    Some(trade_info.execution_price.as_decimal()),
863                )
864            } else {
865                (None, None, None, None, None)
866            };
867
868        sqlx::query(
869            r"
870            INSERT INTO pool_swap_event (
871                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
872                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
873                order_side, base_quantity, quote_quantity, spot_price, execution_price
874            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::U160, $11::U128, $12, $13::I256, $14::I256, $15, $16, $17, $18, $19)
875            ON CONFLICT (chain_id, transaction_hash, log_index)
876            DO NOTHING
877        ",
878        )
879        .bind(chain_id as i32)
880        .bind(swap.dex.name.to_string())
881        .bind(swap.pool_identifier.as_str())
882        .bind(swap.block as i64)
883        .bind(swap.transaction_hash.as_str())
884        .bind(swap.transaction_index as i32)
885        .bind(swap.log_index as i32)
886        .bind(swap.sender.to_string())
887        .bind(swap.recipient.to_string())
888        .bind(swap.sqrt_price_x96.to_string())
889        .bind(swap.liquidity.to_string())
890        .bind(swap.tick)
891        .bind(swap.amount0.to_string())
892        .bind(swap.amount1.to_string())
893        .bind(order_side)
894        .bind(base_quantity)
895        .bind(quote_quantity)
896        .bind(spot_price)
897        .bind(execution_price)
898        .execute(&self.pool)
899        .await
900        .map(|_| ())
901        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
902    }
903
904    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the database operation fails.
909    pub async fn add_pool_liquidity_update(
910        &self,
911        chain_id: u32,
912        liquidity_update: &PoolLiquidityUpdate,
913    ) -> anyhow::Result<()> {
914        sqlx::query(
915            r"
916            INSERT INTO pool_liquidity_event (
917                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index, log_index,
918                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
919            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
920            ON CONFLICT (chain_id, transaction_hash, log_index)
921            DO NOTHING
922        ",
923        )
924        .bind(chain_id as i32)
925        .bind(liquidity_update.dex.name.to_string())
926        .bind(liquidity_update.pool_identifier.as_str())
927        .bind(liquidity_update.block as i64)
928        .bind(liquidity_update.transaction_hash.as_str())
929        .bind(liquidity_update.transaction_index as i32)
930        .bind(liquidity_update.log_index as i32)
931        .bind(liquidity_update.kind.to_string())
932        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
933        .bind(liquidity_update.owner.to_string())
934        .bind(U128Pg(liquidity_update.position_liquidity))
935        .bind(U256Pg(liquidity_update.amount0))
936        .bind(U256Pg(liquidity_update.amount1))
937        .bind(liquidity_update.tick_lower)
938        .bind(liquidity_update.tick_upper)
939        .execute(&self.pool)
940        .await
941        .map(|_| ())
942        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
943    }
944
945    /// Retrieves all valid token records for the given chain and converts them into `Token` domain objects.
946    ///
947    /// Only returns tokens that do not contain error information, filtering out invalid tokens
948    /// that were previously recorded with error details.
949    ///
950    /// # Errors
951    ///
952    /// Returns an error if the database query fails.
953    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
954        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
955            .bind(chain.chain_id as i32)
956            .fetch_all(&self.pool)
957            .await
958            .map(|rows| {
959                rows.into_iter()
960                    .map(|token_row| {
961                        Token::new(
962                            chain.clone(),
963                            token_row.address,
964                            token_row.name,
965                            token_row.symbol,
966                            token_row.decimals as u8,
967                        )
968                    })
969                    .collect::<Vec<_>>()
970            })
971            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
972    }
973
974    /// Retrieves all invalid token addresses for a given chain.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the database query fails or address validation fails.
979    pub async fn load_invalid_token_addresses(
980        &self,
981        chain_id: u32,
982    ) -> anyhow::Result<Vec<Address>> {
983        sqlx::query_as::<_, (String,)>(
984            "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
985        )
986        .bind(chain_id as i32)
987        .fetch_all(&self.pool)
988        .await?
989        .into_iter()
990        .map(|(address,)| validate_address(&address))
991        .collect::<Result<Vec<_>, _>>()
992        .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
993    }
994
995    /// Loads pool data from the database for the specified chain and DEX.
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the database query fails, the connection to the database is lost, or the query parameters are invalid.
1000    pub async fn load_pools(
1001        &self,
1002        chain: SharedChain,
1003        dex_id: &str,
1004    ) -> anyhow::Result<Vec<PoolRow>> {
1005        sqlx::query_as::<_, PoolRow>(
1006            r"
1007            SELECT
1008                address,
1009                pool_identifier,
1010                dex_name,
1011                creation_block,
1012                token0_chain,
1013                token0_address,
1014                token1_chain,
1015                token1_address,
1016                fee,
1017                tick_spacing,
1018                initial_tick,
1019                initial_sqrt_price_x96,
1020                hook_address
1021            FROM pool
1022            WHERE chain_id = $1 AND dex_name = $2
1023            ORDER BY creation_block ASC
1024        ",
1025        )
1026        .bind(chain.chain_id as i32)
1027        .bind(dex_id)
1028        .fetch_all(&self.pool)
1029        .await
1030        .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
1031    }
1032
1033    /// Toggles performance optimization settings for sync operations.
1034    ///
1035    /// When enabled (true), applies settings for maximum write performance:
1036    /// - `synchronous_commit` = OFF
1037    /// - `work_mem` increased for bulk operations
1038    ///
1039    /// When disabled (false), restores default safe settings:
1040    /// - `synchronous_commit` = ON (data safety)
1041    /// - `work_mem` back to default
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if the database operations fail.
1046    pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
1047        if enable {
1048            tracing::info!("Enabling performance sync settings for bulk operations");
1049
1050            // Set synchronous_commit to OFF for maximum write performance
1051            sqlx::query("SET synchronous_commit = OFF")
1052                .execute(&self.pool)
1053                .await
1054                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
1055
1056            // Increase work_mem for bulk operations
1057            sqlx::query("SET work_mem = '256MB'")
1058                .execute(&self.pool)
1059                .await
1060                .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1061
1062            tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1063        } else {
1064            tracing::info!("Restoring default safe database performance settings");
1065
1066            // Restore synchronous_commit to ON for data safety
1067            sqlx::query("SET synchronous_commit = ON")
1068                .execute(&self.pool)
1069                .await
1070                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1071
1072            // Reset work_mem to default
1073            sqlx::query("RESET work_mem")
1074                .execute(&self.pool)
1075                .await
1076                .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1077        }
1078
1079        Ok(())
1080    }
1081
1082    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
1083    ///
1084    /// # Errors
1085    ///
1086    /// Returns an error if the database operation fails.
1087    pub async fn update_dex_last_synced_block(
1088        &self,
1089        chain_id: u32,
1090        dex: &DexType,
1091        block_number: u64,
1092    ) -> anyhow::Result<()> {
1093        sqlx::query(
1094            r"
1095            UPDATE dex
1096            SET last_full_sync_pools_block_number = $3
1097            WHERE chain_id = $1 AND name = $2
1098            ",
1099        )
1100        .bind(chain_id as i32)
1101        .bind(dex.to_string())
1102        .bind(block_number as i64)
1103        .execute(&self.pool)
1104        .await
1105        .map(|_| ())
1106        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1107    }
1108
1109    /// Updates the last synced block number for a pool.
1110    ///
1111    /// # Errors
1112    ///
1113    /// Returns an error if the database update fails.
1114    pub async fn update_pool_last_synced_block(
1115        &self,
1116        chain_id: u32,
1117        dex: &DexType,
1118        pool_identifier: &PoolIdentifier,
1119        block_number: u64,
1120    ) -> anyhow::Result<()> {
1121        sqlx::query(
1122            r"
1123            UPDATE pool
1124            SET last_full_sync_block_number = $4
1125            WHERE chain_id = $1
1126            AND dex_name = $2
1127            AND pool_identifier = $3
1128            ",
1129        )
1130        .bind(chain_id as i32)
1131        .bind(dex.to_string())
1132        .bind(pool_identifier.as_ref())
1133        .bind(block_number as i64)
1134        .execute(&self.pool)
1135        .await
1136        .map(|_| ())
1137        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1138    }
1139
1140    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
1141    ///
1142    /// # Errors
1143    ///
1144    /// Returns an error if the database query fails.
1145    pub async fn get_dex_last_synced_block(
1146        &self,
1147        chain_id: u32,
1148        dex: &DexType,
1149    ) -> anyhow::Result<Option<u64>> {
1150        let result = sqlx::query_as::<_, (Option<i64>,)>(
1151            r#"
1152            SELECT
1153                last_full_sync_pools_block_number
1154            FROM dex
1155            WHERE chain_id = $1
1156            AND name = $2
1157            "#,
1158        )
1159        .bind(chain_id as i32)
1160        .bind(dex.to_string())
1161        .fetch_optional(&self.pool)
1162        .await
1163        .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1164
1165        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1166    }
1167
1168    /// Retrieves the last synced block number for a pool.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the database query fails.
1173    pub async fn get_pool_last_synced_block(
1174        &self,
1175        chain_id: u32,
1176        dex: &DexType,
1177        pool_identifier: &PoolIdentifier,
1178    ) -> anyhow::Result<Option<u64>> {
1179        let result = sqlx::query_as::<_, (Option<i64>,)>(
1180            r#"
1181            SELECT
1182                last_full_sync_block_number
1183            FROM pool
1184            WHERE chain_id = $1
1185            AND dex_name = $2
1186            AND pool_identifier = $3
1187            "#,
1188        )
1189        .bind(chain_id as i32)
1190        .bind(dex.to_string())
1191        .bind(pool_identifier.as_ref())
1192        .fetch_optional(&self.pool)
1193        .await
1194        .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1195
1196        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1197    }
1198
1199    /// Retrieves the maximum block number from a specific table for a given pool.
1200    /// This is useful to detect orphaned data where events were inserted but progress wasn't updated.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Returns an error if the database query fails.
1205    pub async fn get_table_last_block(
1206        &self,
1207        chain_id: u32,
1208        table_name: &str,
1209        pool_identifier: &PoolIdentifier,
1210    ) -> anyhow::Result<Option<u64>> {
1211        let query = format!(
1212            "SELECT MAX(block) FROM {table_name} WHERE chain_id = $1 AND pool_identifier = $2"
1213        );
1214        let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1215            .bind(chain_id as i32)
1216            .bind(pool_identifier.as_ref())
1217            .fetch_optional(&self.pool)
1218            .await
1219            .map_err(|e| anyhow::anyhow!("Failed to get table last block for {table_name}: {e}"))?;
1220
1221        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1222    }
1223
1224    /// Adds a batch of pool fee collect events to the database using batch operations.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if the database operation fails.
1229    pub async fn add_pool_collects_batch(
1230        &self,
1231        chain_id: u32,
1232        collects: &[PoolFeeCollect],
1233    ) -> anyhow::Result<()> {
1234        if collects.is_empty() {
1235            return Ok(());
1236        }
1237
1238        // Prepare vectors for each column
1239        let len = collects.len();
1240        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1241        let mut dex_names: Vec<String> = Vec::with_capacity(len);
1242        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1243        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1244        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1245        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1246        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1247        let mut owners: Vec<String> = Vec::with_capacity(len);
1248        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1249        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1250        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1251        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1252
1253        // Fill vectors from collects
1254        for collect in collects {
1255            chain_ids.push(chain_id as i32);
1256            dex_names.push(collect.dex.name.to_string());
1257            pool_identifiers.push(collect.pool_identifier.to_string());
1258            blocks.push(collect.block as i64);
1259            transaction_hashes.push(collect.transaction_hash.clone());
1260            transaction_indices.push(collect.transaction_index as i32);
1261            log_indices.push(collect.log_index as i32);
1262            owners.push(collect.owner.to_string());
1263            amount0s.push(collect.amount0.to_string());
1264            amount1s.push(collect.amount1.to_string());
1265            tick_lowers.push(collect.tick_lower);
1266            tick_uppers.push(collect.tick_upper);
1267        }
1268
1269        // Execute batch insert with UNNEST
1270        sqlx::query(
1271            r"
1272            INSERT INTO pool_collect_event (
1273                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1274                log_index, owner, amount0, amount1, tick_lower, tick_upper
1275            )
1276            SELECT
1277                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1278                log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1279            FROM UNNEST(
1280                $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
1281                $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::INT[], $12::INT[]
1282            ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1283                   log_index, owner, amount0, amount1, tick_lower, tick_upper)
1284            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1285           ",
1286        )
1287        .bind(&chain_ids[..])
1288        .bind(&dex_names[..])
1289        .bind(&pool_identifiers[..])
1290        .bind(&blocks[..])
1291        .bind(&transaction_hashes[..])
1292        .bind(&transaction_indices[..])
1293        .bind(&log_indices[..])
1294        .bind(&owners[..])
1295        .bind(&amount0s[..])
1296        .bind(&amount1s[..])
1297        .bind(&tick_lowers[..])
1298        .bind(&tick_uppers[..])
1299        .execute(&self.pool)
1300        .await
1301        .map(|_| ())
1302        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1303    }
1304
1305    /// Inserts multiple pool flash events in a single database operation using UNNEST for optimal performance.
1306    ///
1307    /// # Errors
1308    ///
1309    /// Returns an error if the database operation fails.
1310    pub async fn add_pool_flash_batch(
1311        &self,
1312        chain_id: u32,
1313        flash_events: &[PoolFlash],
1314    ) -> anyhow::Result<()> {
1315        if flash_events.is_empty() {
1316            return Ok(());
1317        }
1318
1319        // Prepare vectors for each column
1320        let len = flash_events.len();
1321        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1322        let mut dex_names: Vec<String> = Vec::with_capacity(len);
1323        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1324        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1325        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1326        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1327        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1328        let mut senders: Vec<String> = Vec::with_capacity(len);
1329        let mut recipients: Vec<String> = Vec::with_capacity(len);
1330        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1331        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1332        let mut paid0s: Vec<String> = Vec::with_capacity(len);
1333        let mut paid1s: Vec<String> = Vec::with_capacity(len);
1334
1335        // Fill vectors from flash events
1336        for flash in flash_events {
1337            chain_ids.push(chain_id as i32);
1338            dex_names.push(flash.dex.name.to_string());
1339            pool_identifiers.push(flash.pool_identifier.to_string());
1340            blocks.push(flash.block as i64);
1341            transaction_hashes.push(flash.transaction_hash.clone());
1342            transaction_indices.push(flash.transaction_index as i32);
1343            log_indices.push(flash.log_index as i32);
1344            senders.push(flash.sender.to_string());
1345            recipients.push(flash.recipient.to_string());
1346            amount0s.push(flash.amount0.to_string());
1347            amount1s.push(flash.amount1.to_string());
1348            paid0s.push(flash.paid0.to_string());
1349            paid1s.push(flash.paid1.to_string());
1350        }
1351
1352        // Execute batch insert with UNNEST
1353        sqlx::query(
1354            r"
1355            INSERT INTO pool_flash_event (
1356                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1357                log_index, sender, recipient, amount0, amount1, paid0, paid1
1358            )
1359            SELECT
1360                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1361                log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1362            FROM UNNEST(
1363                $1::INT[], $2::TEXT[], $3::TEXT[], $4::INT[], $5::TEXT[], $6::INT[],
1364                $7::INT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[], $13::TEXT[]
1365            ) AS t(chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
1366                   log_index, sender, recipient, amount0, amount1, paid0, paid1)
1367            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1368           ",
1369        )
1370        .bind(&chain_ids[..])
1371        .bind(&dex_names[..])
1372        .bind(&pool_identifiers[..])
1373        .bind(&blocks[..])
1374        .bind(&transaction_hashes[..])
1375        .bind(&transaction_indices[..])
1376        .bind(&log_indices[..])
1377        .bind(&senders[..])
1378        .bind(&recipients[..])
1379        .bind(&amount0s[..])
1380        .bind(&amount1s[..])
1381        .bind(&paid0s[..])
1382        .bind(&paid1s[..])
1383        .execute(&self.pool)
1384        .await
1385        .map(|_| ())
1386        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1387    }
1388
1389    /// Adds a pool snapshot to the database.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if the database insert fails.
1394    pub async fn add_pool_snapshot(
1395        &self,
1396        chain_id: u32,
1397        dex_name: &DexType,
1398        pool_identifier: &PoolIdentifier,
1399        snapshot: &PoolSnapshot,
1400    ) -> anyhow::Result<()> {
1401        sqlx::query(
1402            r"
1403            INSERT INTO pool_snapshot (
1404                chain_id, dex_name, pool_identifier, block, transaction_index, log_index, transaction_hash,
1405                current_tick, price_sqrt_ratio_x96, liquidity,
1406                protocol_fees_token0, protocol_fees_token1, fee_protocol,
1407                fee_growth_global_0, fee_growth_global_1,
1408                total_amount0_deposited, total_amount1_deposited,
1409                total_amount0_collected, total_amount1_collected,
1410                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1411                liquidity_utilization_rate
1412            ) VALUES (
1413                $1, $2, $3, $4, $5, $6, $7,
1414                $8, $9::U160, $10::U128, $11::U256, $12::U256, $13,
1415                $14::U256, $15::U256, $16::U256, $17::U256, $18::U256, $19::U256,
1416                $20, $21, $22, $23, $24, $25
1417            )
1418            ON CONFLICT (chain_id, pool_identifier, block, transaction_index, log_index)
1419            DO NOTHING
1420            ",
1421        )
1422        .bind(chain_id as i32)
1423        .bind(dex_name.to_string())
1424        .bind(pool_identifier.as_ref())
1425        .bind(snapshot.block_position.number as i64)
1426        .bind(snapshot.block_position.transaction_index as i32)
1427        .bind(snapshot.block_position.log_index as i32)
1428        .bind(snapshot.block_position.transaction_hash.clone())
1429        .bind(snapshot.state.current_tick)
1430        .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1431        .bind(snapshot.state.liquidity.to_string())
1432        .bind(snapshot.state.protocol_fees_token0.to_string())
1433        .bind(snapshot.state.protocol_fees_token1.to_string())
1434        .bind(snapshot.state.fee_protocol as i16)
1435        .bind(snapshot.state.fee_growth_global_0.to_string())
1436        .bind(snapshot.state.fee_growth_global_1.to_string())
1437        .bind(snapshot.analytics.total_amount0_deposited.to_string())
1438        .bind(snapshot.analytics.total_amount1_deposited.to_string())
1439        .bind(snapshot.analytics.total_amount0_collected.to_string())
1440        .bind(snapshot.analytics.total_amount1_collected.to_string())
1441        .bind(snapshot.analytics.total_swaps as i32)
1442        .bind(snapshot.analytics.total_mints as i32)
1443        .bind(snapshot.analytics.total_burns as i32)
1444        .bind(snapshot.analytics.total_fee_collects as i32)
1445        .bind(snapshot.analytics.total_flashes as i32)
1446        .bind(snapshot.analytics.liquidity_utilization_rate)
1447        .execute(&self.pool)
1448        .await
1449        .map(|_| ())
1450        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1451    }
1452
1453    /// Inserts multiple pool positions in a single database operation using UNNEST for optimal performance.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns an error if the database operation fails.
1458    pub async fn add_pool_positions_batch(
1459        &self,
1460        chain_id: u32,
1461        snapshot_block: u64,
1462        snapshot_transaction_index: u32,
1463        snapshot_log_index: u32,
1464        positions: &[(PoolIdentifier, PoolPosition)],
1465    ) -> anyhow::Result<()> {
1466        if positions.is_empty() {
1467            return Ok(());
1468        }
1469
1470        // Prepare vectors for each column
1471        let len = positions.len();
1472        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1473        let mut owners: Vec<String> = Vec::with_capacity(len);
1474        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1475        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1476        let mut liquidities: Vec<String> = Vec::with_capacity(len);
1477        let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1478        let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1479        let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1480        let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1481        let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1482        let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1483        let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1484        let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1485
1486        // Fill vectors from positions
1487        for (pool_address, position) in positions {
1488            pool_identifiers.push(pool_address.to_string());
1489            owners.push(position.owner.to_string());
1490            tick_lowers.push(position.tick_lower);
1491            tick_uppers.push(position.tick_upper);
1492            liquidities.push(position.liquidity.to_string());
1493            fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1494            fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1495            tokens_owed_0s.push(position.tokens_owed_0.to_string());
1496            tokens_owed_1s.push(position.tokens_owed_1.to_string());
1497            total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1498            total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1499            total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1500            total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1501        }
1502
1503        // Execute batch insert with UNNEST
1504        sqlx::query(
1505            r"
1506            INSERT INTO pool_position (
1507                chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1508                owner, tick_lower, tick_upper,
1509                liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1510                tokens_owed_0, tokens_owed_1,
1511                total_amount0_deposited, total_amount1_deposited,
1512                total_amount0_collected, total_amount1_collected
1513            )
1514            SELECT
1515                $1, pool_identifier, $2, $3, $4,
1516                owner, tick_lower, tick_upper,
1517                liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1518                tokens_owed_0::U128, tokens_owed_1::U128,
1519                total_amount0_deposited::U256, total_amount1_deposited::U256,
1520                total_amount0_collected::U128, total_amount1_collected::U128
1521            FROM UNNEST(
1522                $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1523                $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1524                $16::TEXT[], $17::TEXT[]
1525            ) AS t(pool_identifier, owner, tick_lower, tick_upper,
1526                   liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1527                   tokens_owed_0, tokens_owed_1,
1528                   total_amount0_deposited, total_amount1_deposited,
1529                   total_amount0_collected, total_amount1_collected)
1530            ON CONFLICT (chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1531            DO NOTHING
1532           ",
1533        )
1534        .bind(chain_id as i32)
1535        .bind(snapshot_block as i64)
1536        .bind(snapshot_transaction_index as i32)
1537        .bind(snapshot_log_index as i32)
1538        .bind(&pool_identifiers[..])
1539        .bind(&owners[..])
1540        .bind(&tick_lowers[..])
1541        .bind(&tick_uppers[..])
1542        .bind(&liquidities[..])
1543        .bind(&fee_growth_inside_0_lasts[..])
1544        .bind(&fee_growth_inside_1_lasts[..])
1545        .bind(&tokens_owed_0s[..])
1546        .bind(&tokens_owed_1s[..])
1547        .bind(&total_amount0_depositeds as &[Option<String>])
1548        .bind(&total_amount1_depositeds as &[Option<String>])
1549        .bind(&total_amount0_collecteds as &[Option<String>])
1550        .bind(&total_amount1_collecteds as &[Option<String>])
1551        .execute(&self.pool)
1552        .await
1553        .map(|_| ())
1554        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1555    }
1556
1557    /// Inserts multiple pool ticks in a single database operation using UNNEST for optimal performance.
1558    ///
1559    /// # Errors
1560    ///
1561    /// Returns an error if the database operation fails.
1562    pub async fn add_pool_ticks_batch(
1563        &self,
1564        chain_id: u32,
1565        snapshot_block: u64,
1566        snapshot_transaction_index: u32,
1567        snapshot_log_index: u32,
1568        ticks: &[(PoolIdentifier, &PoolTick)],
1569    ) -> anyhow::Result<()> {
1570        if ticks.is_empty() {
1571            return Ok(());
1572        }
1573
1574        // Prepare vectors for each column
1575        let len = ticks.len();
1576        let mut pool_identifiers: Vec<String> = Vec::with_capacity(len);
1577        let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1578        let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1579        let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1580        let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1581        let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1582        let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1583        let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1584
1585        // Fill vectors from ticks
1586        for (pool_address, tick) in ticks {
1587            pool_identifiers.push(pool_address.to_string());
1588            tick_values.push(tick.value);
1589            liquidity_grosses.push(tick.liquidity_gross.to_string());
1590            liquidity_nets.push(tick.liquidity_net.to_string());
1591            fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1592            fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1593            initializeds.push(tick.initialized);
1594            last_updated_blocks.push(tick.last_updated_block as i64);
1595        }
1596
1597        // Execute batch insert with UNNEST
1598        sqlx::query(
1599            r"
1600            INSERT INTO pool_tick (
1601                chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1602                tick_value, liquidity_gross, liquidity_net,
1603                fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1604            )
1605            SELECT
1606                $1, pool_identifier, $2, $3, $4,
1607                tick_value, liquidity_gross::U128, liquidity_net::I128,
1608                fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1609            FROM UNNEST(
1610                $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1611                $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1612            ) AS t(pool_identifier, tick_value, liquidity_gross, liquidity_net,
1613                   fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1614            ON CONFLICT (chain_id, pool_identifier, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1615            DO NOTHING
1616           ",
1617        )
1618        .bind(chain_id as i32)
1619        .bind(snapshot_block as i64)
1620        .bind(snapshot_transaction_index as i32)
1621        .bind(snapshot_log_index as i32)
1622        .bind(&pool_identifiers[..])
1623        .bind(&tick_values[..])
1624        .bind(&liquidity_grosses[..])
1625        .bind(&liquidity_nets[..])
1626        .bind(&fee_growth_outside_0s[..])
1627        .bind(&fee_growth_outside_1s[..])
1628        .bind(&initializeds[..])
1629        .bind(&last_updated_blocks[..])
1630        .execute(&self.pool)
1631        .await
1632        .map(|_| ())
1633        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1634    }
1635
1636    /// Updates the initial price and tick for a pool.
1637    ///
1638    /// # Errors
1639    ///
1640    /// Returns an error if the database update fails.
1641    pub async fn update_pool_initial_price_tick(
1642        &self,
1643        chain_id: u32,
1644        initialize_event: &InitializeEvent,
1645    ) -> anyhow::Result<()> {
1646        sqlx::query(
1647            r"
1648            UPDATE pool
1649            SET
1650                initial_tick = $4,
1651                initial_sqrt_price_x96 = $5
1652            WHERE chain_id = $1
1653            AND dex_name = $2
1654            AND pool_identifier = $3
1655            ",
1656        )
1657        .bind(chain_id as i32)
1658        .bind(initialize_event.dex.name.to_string())
1659        .bind(initialize_event.pool_identifier.as_ref())
1660        .bind(initialize_event.tick)
1661        .bind(initialize_event.sqrt_price_x96.to_string())
1662        .execute(&self.pool)
1663        .await
1664        .map(|_| ())
1665        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1666    }
1667
1668    /// Loads the latest valid pool snapshot from the database.
1669    ///
1670    /// Returns the most recent snapshot that has been validated against on-chain state.
1671    ///
1672    /// # Errors
1673    ///
1674    /// Returns an error if the database query fails.
1675    pub async fn load_latest_valid_pool_snapshot(
1676        &self,
1677        chain_id: u32,
1678        pool_identifier: &PoolIdentifier,
1679    ) -> anyhow::Result<Option<PoolSnapshot>> {
1680        let result = sqlx::query(
1681            r"
1682            SELECT
1683                block, transaction_index, log_index, transaction_hash,
1684                current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1685                protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1686                fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1687                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1688                total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1689                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1690                liquidity_utilization_rate,
1691                (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1692            FROM pool_snapshot
1693            WHERE chain_id = $1 AND pool_identifier = $2 AND is_valid = TRUE
1694            ORDER BY block DESC, transaction_index DESC, log_index DESC
1695            LIMIT 1
1696            ",
1697        )
1698        .bind(chain_id as i32)
1699        .bind(pool_identifier.as_ref())
1700        .fetch_optional(&self.pool)
1701        .await
1702        .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {e}"))?;
1703
1704        if let Some(row) = result {
1705            // Parse snapshot state
1706            let block: i64 = row.get("block");
1707            let transaction_index: i32 = row.get("transaction_index");
1708            let log_index: i32 = row.get("log_index");
1709            let transaction_hash: String = row.get("transaction_hash");
1710
1711            let block_position = BlockPosition::new(
1712                block as u64,
1713                transaction_hash,
1714                transaction_index as u32,
1715                log_index as u32,
1716            );
1717
1718            let state = PoolState {
1719                current_tick: row.get("current_tick"),
1720                price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1721                liquidity: row.get::<String, _>("liquidity").parse()?,
1722                protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1723                protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1724                fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1725                fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1726                fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1727            };
1728
1729            let analytics = PoolAnalytics {
1730                total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1731                total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1732                total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1733                total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1734                total_swaps: row.get::<i32, _>("total_swaps") as u64,
1735                total_mints: row.get::<i32, _>("total_mints") as u64,
1736                total_burns: row.get::<i32, _>("total_burns") as u64,
1737                total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1738                total_flashes: row.get::<i32, _>("total_flashes") as u64,
1739                liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1740            };
1741
1742            // Load positions and ticks
1743            let positions = self
1744                .load_pool_positions_for_snapshot(
1745                    chain_id,
1746                    pool_identifier,
1747                    block as u64,
1748                    transaction_index as u32,
1749                    log_index as u32,
1750                )
1751                .await?;
1752
1753            let ticks = self
1754                .load_pool_ticks_for_snapshot(
1755                    chain_id,
1756                    pool_identifier,
1757                    block as u64,
1758                    transaction_index as u32,
1759                    log_index as u32,
1760                )
1761                .await?;
1762
1763            let dex_name: String = row.get("dex_name");
1764            let chain = Chain::from_chain_id(chain_id)
1765                .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {chain_id}"))?;
1766
1767            let dex_type = DexType::from_dex_name(&dex_name)
1768                .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {dex_name}"))?;
1769
1770            let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1771                .ok_or_else(|| {
1772                    anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1773                })?;
1774
1775            let instrument_id =
1776                Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_identifier.as_ref());
1777
1778            Ok(Some(PoolSnapshot::new(
1779                instrument_id,
1780                state,
1781                positions,
1782                ticks,
1783                analytics,
1784                block_position,
1785            )))
1786        } else {
1787            Ok(None)
1788        }
1789    }
1790
1791    /// Marks a pool snapshot as valid after successful on-chain verification.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if the database operation fails.
1796    pub async fn mark_pool_snapshot_valid(
1797        &self,
1798        chain_id: u32,
1799        pool_identifier: &PoolIdentifier,
1800        block: u64,
1801        transaction_index: u32,
1802        log_index: u32,
1803    ) -> anyhow::Result<()> {
1804        sqlx::query(
1805            r"
1806            UPDATE pool_snapshot
1807            SET is_valid = TRUE
1808            WHERE chain_id = $1
1809            AND pool_identifier = $2
1810            AND block = $3
1811            AND transaction_index = $4
1812            AND log_index = $5
1813            ",
1814        )
1815        .bind(chain_id as i32)
1816        .bind(pool_identifier.as_ref())
1817        .bind(block as i64)
1818        .bind(transaction_index as i32)
1819        .bind(log_index as i32)
1820        .execute(&self.pool)
1821        .await
1822        .map(|_| ())
1823        .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {e}"))
1824    }
1825
1826    /// Loads all positions for a specific snapshot.
1827    ///
1828    /// # Errors
1829    ///
1830    /// Returns an error if the database query fails.
1831    pub async fn load_pool_positions_for_snapshot(
1832        &self,
1833        chain_id: u32,
1834        pool_identifier: &PoolIdentifier,
1835        snapshot_block: u64,
1836        snapshot_transaction_index: u32,
1837        snapshot_log_index: u32,
1838    ) -> anyhow::Result<Vec<PoolPosition>> {
1839        let rows = sqlx::query(
1840            r"
1841            SELECT
1842                owner, tick_lower, tick_upper,
1843                liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1844                tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1845                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1846                total_amount0_collected::TEXT, total_amount1_collected::TEXT
1847            FROM pool_position
1848            WHERE chain_id = $1
1849            AND pool_identifier = $2
1850            AND snapshot_block = $3
1851            AND snapshot_transaction_index = $4
1852            AND snapshot_log_index = $5
1853            ",
1854        )
1855        .bind(chain_id as i32)
1856        .bind(pool_identifier.as_ref())
1857        .bind(snapshot_block as i64)
1858        .bind(snapshot_transaction_index as i32)
1859        .bind(snapshot_log_index as i32)
1860        .fetch_all(&self.pool)
1861        .await
1862        .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {e}"))?;
1863
1864        rows.iter()
1865            .map(|row| {
1866                let owner: String = row.get("owner");
1867                let position = PoolPosition {
1868                    owner: validate_address(&owner)?,
1869                    tick_lower: row.get("tick_lower"),
1870                    tick_upper: row.get("tick_upper"),
1871                    liquidity: row.get::<String, _>("liquidity").parse()?,
1872                    fee_growth_inside_0_last: row
1873                        .get::<String, _>("fee_growth_inside_0_last")
1874                        .parse()?,
1875                    fee_growth_inside_1_last: row
1876                        .get::<String, _>("fee_growth_inside_1_last")
1877                        .parse()?,
1878                    tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1879                    tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1880                    total_amount0_deposited: row
1881                        .get::<String, _>("total_amount0_deposited")
1882                        .parse()?,
1883                    total_amount1_deposited: row
1884                        .get::<String, _>("total_amount1_deposited")
1885                        .parse()?,
1886                    total_amount0_collected: row
1887                        .get::<String, _>("total_amount0_collected")
1888                        .parse()?,
1889                    total_amount1_collected: row
1890                        .get::<String, _>("total_amount1_collected")
1891                        .parse()?,
1892                };
1893                Ok(position)
1894            })
1895            .collect()
1896    }
1897
1898    /// Loads all ticks for a specific snapshot.
1899    ///
1900    /// # Errors
1901    ///
1902    /// Returns an error if the database query fails.
1903    pub async fn load_pool_ticks_for_snapshot(
1904        &self,
1905        chain_id: u32,
1906        pool_identifier: &PoolIdentifier,
1907        snapshot_block: u64,
1908        snapshot_transaction_index: u32,
1909        snapshot_log_index: u32,
1910    ) -> anyhow::Result<Vec<PoolTick>> {
1911        let rows = sqlx::query(
1912            r"
1913            SELECT
1914                tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1915                fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1916                last_updated_block
1917            FROM pool_tick
1918            WHERE chain_id = $1
1919            AND pool_identifier = $2
1920            AND snapshot_block = $3
1921            AND snapshot_transaction_index = $4
1922            AND snapshot_log_index = $5
1923            ",
1924        )
1925        .bind(chain_id as i32)
1926        .bind(pool_identifier.as_ref())
1927        .bind(snapshot_block as i64)
1928        .bind(snapshot_transaction_index as i32)
1929        .bind(snapshot_log_index as i32)
1930        .fetch_all(&self.pool)
1931        .await
1932        .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {e}"))?;
1933
1934        rows.iter()
1935            .map(|row| {
1936                let tick = PoolTick::new(
1937                    row.get("tick_value"),
1938                    row.get::<String, _>("liquidity_gross").parse()?,
1939                    row.get::<String, _>("liquidity_net").parse()?,
1940                    row.get::<String, _>("fee_growth_outside_0").parse()?,
1941                    row.get::<String, _>("fee_growth_outside_1").parse()?,
1942                    row.get("initialized"),
1943                    row.get::<i64, _>("last_updated_block") as u64,
1944                );
1945                Ok(tick)
1946            })
1947            .collect()
1948    }
1949
1950    /// Streams pool events from all event tables (swap, liquidity, collect) for a specific pool.
1951    ///
1952    /// Creates a unified stream of pool events from multiple tables, ordering them chronologically
1953    /// by block number, transaction index, and log index. Optionally resumes from a specific block position.
1954    ///
1955    /// # Returns
1956    ///
1957    /// A stream of `DexPoolData` events in chronological order.
1958    ///
1959    /// # Errors
1960    ///
1961    /// Returns an error if the database query fails or if event transformation fails.
1962    pub fn stream_pool_events<'a>(
1963        &'a self,
1964        chain: SharedChain,
1965        dex: SharedDex,
1966        instrument_id: InstrumentId,
1967        pool_identifier: PoolIdentifier,
1968        from_position: Option<BlockPosition>,
1969    ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1970        // Query without position filter (streams all events)
1971        const QUERY_ALL: &str = r"
1972            (SELECT
1973                'swap' as event_type,
1974                chain_id,
1975                pool_identifier,
1976                block,
1977                transaction_hash,
1978                transaction_index,
1979                log_index,
1980                sender,
1981                recipient,
1982                NULL::TEXT as owner,
1983                sqrt_price_x96::TEXT,
1984                liquidity::TEXT as swap_liquidity,
1985                tick as swap_tick,
1986                amount0::TEXT as swap_amount0,
1987                amount1::TEXT as swap_amount1,
1988                NULL::TEXT as position_liquidity,
1989                NULL::TEXT as amount0,
1990                NULL::TEXT as amount1,
1991                NULL::INT as tick_lower,
1992                NULL::INT as tick_upper,
1993                NULL::TEXT as liquidity_event_type,
1994                NULL::TEXT as flash_amount0,
1995                NULL::TEXT as flash_amount1,
1996                NULL::TEXT as flash_paid0,
1997                NULL::TEXT as flash_paid1
1998            FROM pool_swap_event
1999            WHERE chain_id = $1 AND pool_identifier = $2)
2000            UNION ALL
2001            (SELECT
2002                'liquidity' as event_type,
2003                chain_id,
2004                pool_identifier,
2005                block,
2006                transaction_hash,
2007                transaction_index,
2008                log_index,
2009                sender,
2010                NULL::TEXT as recipient,
2011                owner,
2012                NULL::text as sqrt_price_x96,
2013                NULL::TEXT as swap_liquidity,
2014                NULL::INT as swap_tick,
2015                amount0::TEXT as swap_amount0,
2016                amount1::TEXT as swap_amount1,
2017                position_liquidity::TEXT,
2018                amount0::TEXT,
2019                amount1::TEXT,
2020                tick_lower::INT,
2021                tick_upper::INT,
2022                event_type as liquidity_event_type,
2023                NULL::TEXT as flash_amount0,
2024                NULL::TEXT as flash_amount1,
2025                NULL::TEXT as flash_paid0,
2026                NULL::TEXT as flash_paid1
2027            FROM pool_liquidity_event
2028            WHERE chain_id = $1 AND pool_identifier = $2)
2029            UNION ALL
2030            (SELECT
2031                'collect' as event_type,
2032                chain_id,
2033                pool_identifier,
2034                block,
2035                transaction_hash,
2036                transaction_index,
2037                log_index,
2038                NULL::TEXT as sender,
2039                NULL::TEXT as recipient,
2040                owner,
2041                NULL::TEXT as sqrt_price_x96,
2042                NULL::TEXT as swap_liquidity,
2043                NULL::INT AS swap_tick,
2044                amount0::TEXT as swap_amount0,
2045                amount1::TEXT as swap_amount1,
2046                NULL::TEXT as position_liquidity,
2047                amount0::TEXT,
2048                amount1::TEXT,
2049                tick_lower::INT,
2050                tick_upper::INT,
2051                NULL::TEXT as liquidity_event_type,
2052                NULL::TEXT as flash_amount0,
2053                NULL::TEXT as flash_amount1,
2054                NULL::TEXT as flash_paid0,
2055                NULL::TEXT as flash_paid1
2056            FROM pool_collect_event
2057            WHERE chain_id = $1 AND pool_identifier = $2)
2058            UNION ALL
2059            (SELECT
2060                'flash' as event_type,
2061                chain_id,
2062                pool_identifier,
2063                block,
2064                transaction_hash,
2065                transaction_index,
2066                log_index,
2067                sender,
2068                recipient,
2069                NULL::TEXT as owner,
2070                NULL::TEXT as sqrt_price_x96,
2071                NULL::TEXT as swap_liquidity,
2072                NULL::INT AS swap_tick,
2073                NULL::TEXT as swap_amount0,
2074                NULL::TEXT as swap_amount1,
2075                NULL::TEXT as position_liquidity,
2076                NULL::TEXT as amount0,
2077                NULL::TEXT as amount1,
2078                NULL::INT as tick_lower,
2079                NULL::INT as tick_upper,
2080                NULL::TEXT as liquidity_event_type,
2081                amount0::TEXT as flash_amount0,
2082                amount1::TEXT as flash_amount1,
2083                paid0::TEXT as flash_paid0,
2084                paid1::TEXT as flash_paid1
2085            FROM pool_flash_event
2086            WHERE chain_id = $1 AND pool_identifier = $2)
2087            ORDER BY block, transaction_index, log_index";
2088
2089        // Query with position filter (resumes from specific block position)
2090        const QUERY_FROM_POSITION: &str = r"
2091            (SELECT
2092                'swap' as event_type,
2093                chain_id,
2094                pool_identifier,
2095                block,
2096                transaction_hash,
2097                transaction_index,
2098                log_index,
2099                sender,
2100                recipient,
2101                NULL::TEXT as owner,
2102                sqrt_price_x96::TEXT,
2103                liquidity::TEXT as swap_liquidity,
2104                tick as swap_tick,
2105                amount0::TEXT as swap_amount0,
2106                amount1::TEXT as swap_amount1,
2107                NULL::TEXT as position_liquidity,
2108                NULL::TEXT as amount0,
2109                NULL::TEXT as amount1,
2110                NULL::INT as tick_lower,
2111                NULL::INT as tick_upper,
2112                NULL::TEXT as liquidity_event_type,
2113                NULL::TEXT as flash_amount0,
2114                NULL::TEXT as flash_amount1,
2115                NULL::TEXT as flash_paid0,
2116                NULL::TEXT as flash_paid1
2117            FROM pool_swap_event
2118            WHERE chain_id = $1 AND pool_identifier = $2
2119            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2120            UNION ALL
2121            (SELECT
2122                'liquidity' as event_type,
2123                chain_id,
2124                pool_identifier,
2125                block,
2126                transaction_hash,
2127                transaction_index,
2128                log_index,
2129                sender,
2130                NULL::TEXT as recipient,
2131                owner,
2132                NULL::text as sqrt_price_x96,
2133                NULL::TEXT as swap_liquidity,
2134                NULL::INT as swap_tick,
2135                amount0::TEXT as swap_amount0,
2136                amount1::TEXT as swap_amount1,
2137                position_liquidity::TEXT,
2138                amount0::TEXT,
2139                amount1::TEXT,
2140                tick_lower::INT,
2141                tick_upper::INT,
2142                event_type as liquidity_event_type,
2143                NULL::TEXT as flash_amount0,
2144                NULL::TEXT as flash_amount1,
2145                NULL::TEXT as flash_paid0,
2146                NULL::TEXT as flash_paid1
2147            FROM pool_liquidity_event
2148            WHERE chain_id = $1 AND pool_identifier = $2
2149            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2150            UNION ALL
2151            (SELECT
2152                'collect' as event_type,
2153                chain_id,
2154                pool_identifier,
2155                block,
2156                transaction_hash,
2157                transaction_index,
2158                log_index,
2159                NULL::TEXT as sender,
2160                NULL::TEXT as recipient,
2161                owner,
2162                NULL::TEXT as sqrt_price_x96,
2163                NULL::TEXT as swap_liquidity,
2164                NULL::INT AS swap_tick,
2165                amount0::TEXT as swap_amount0,
2166                amount1::TEXT as swap_amount1,
2167                NULL::TEXT as position_liquidity,
2168                amount0::TEXT,
2169                amount1::TEXT,
2170                tick_lower::INT,
2171                tick_upper::INT,
2172                NULL::TEXT as liquidity_event_type,
2173                NULL::TEXT as flash_amount0,
2174                NULL::TEXT as flash_amount1,
2175                NULL::TEXT as flash_paid0,
2176                NULL::TEXT as flash_paid1
2177            FROM pool_collect_event
2178            WHERE chain_id = $1 AND pool_identifier = $2
2179            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2180            UNION ALL
2181            (SELECT
2182                'flash' as event_type,
2183                chain_id,
2184                pool_identifier,
2185                block,
2186                transaction_hash,
2187                transaction_index,
2188                log_index,
2189                sender,
2190                recipient,
2191                NULL::TEXT as owner,
2192                NULL::TEXT as sqrt_price_x96,
2193                NULL::TEXT as swap_liquidity,
2194                NULL::INT AS swap_tick,
2195                NULL::TEXT as swap_amount0,
2196                NULL::TEXT as swap_amount1,
2197                NULL::TEXT as position_liquidity,
2198                NULL::TEXT as amount0,
2199                NULL::TEXT as amount1,
2200                NULL::INT as tick_lower,
2201                NULL::INT as tick_upper,
2202                NULL::TEXT as liquidity_event_type,
2203                amount0::TEXT as flash_amount0,
2204                amount1::TEXT as flash_amount1,
2205                paid0::TEXT as flash_paid0,
2206                paid1::TEXT as flash_paid1
2207            FROM pool_flash_event
2208            WHERE chain_id = $1 AND pool_identifier = $2
2209            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2210            ORDER BY block, transaction_index, log_index";
2211
2212        // Build query with appropriate bindings
2213        let query = if let Some(pos) = from_position {
2214            sqlx::query(QUERY_FROM_POSITION)
2215                .bind(chain.chain_id as i32)
2216                .bind(pool_identifier.to_string())
2217                .bind(pos.number as i64)
2218                .bind(pos.transaction_index as i32)
2219                .bind(pos.log_index as i32)
2220                .fetch(&self.pool)
2221        } else {
2222            sqlx::query(QUERY_ALL)
2223                .bind(chain.chain_id as i32)
2224                .bind(pool_identifier.to_string())
2225                .fetch(&self.pool)
2226        };
2227
2228        // Transform rows to events
2229        let stream = query.map(move |row_result| match row_result {
2230            Ok(row) => {
2231                transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2232                    .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {e}"))
2233            }
2234            Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {e}")),
2235        });
2236
2237        Box::pin(stream)
2238    }
2239}