nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21    defi::{
22        Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24        pool_analysis::{
25            position::PoolPosition,
26            snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27        },
28        tick_map::tick::PoolTick,
29        validation::validate_address,
30    },
31    identifiers::InstrumentId,
32};
33use sqlx::{PgPool, Row, postgres::PgConnectOptions};
34
35use crate::{
36    cache::{
37        consistency::CachedBlocksConsistencyStatus,
38        copy::PostgresCopyHandler,
39        rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
40        types::{U128Pg, U256Pg},
41    },
42    events::initialize::InitializeEvent,
43};
44
45/// Database interface for persisting and retrieving blockchain entities and domain objects.
46#[derive(Debug)]
47pub struct BlockchainCacheDatabase {
48    /// PostgreSQL connection pool used for database operations.
49    pool: PgPool,
50}
51
52impl BlockchainCacheDatabase {
53    /// Initializes a new database instance by establishing a connection to PostgreSQL.
54    ///
55    /// # Panics
56    ///
57    /// Panics if unable to connect to PostgreSQL with the provided options.
58    pub async fn init(pg_options: PgConnectOptions) -> Self {
59        let pool = sqlx::postgres::PgPoolOptions::new()
60            .max_connections(32) // Increased from default 10
61            .min_connections(5) // Keep some connections warm
62            .acquire_timeout(std::time::Duration::from_secs(3))
63            .connect_with(pg_options)
64            .await
65            .expect("Error connecting to Postgres");
66        Self { pool }
67    }
68
69    /// Seeds the database with a blockchain chain record.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the database operation fails.
74    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
75        sqlx::query(
76            r"
77            INSERT INTO chain (
78                chain_id, name
79            ) VALUES ($1,$2)
80            ON CONFLICT (chain_id)
81            DO NOTHING
82        ",
83        )
84        .bind(chain.chain_id as i32)
85        .bind(chain.name.to_string())
86        .execute(&self.pool)
87        .await
88        .map(|_| ())
89        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
90    }
91
92    /// Creates a table partition for the block table specific to the given chain
93    /// by calling the existing PostgreSQL function `create_block_partition`.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the database operation fails.
98    pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
99        let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
100            .bind(chain.chain_id as i32)
101            .fetch_one(&self.pool)
102            .await
103            .map_err(|e| {
104                anyhow::anyhow!(
105                    "Failed to call create_block_partition for chain {}: {e}",
106                    chain.chain_id
107                )
108            })?;
109
110        Ok(result.0)
111    }
112
113    /// Creates a table partition for the token table specific to the given chain
114    /// by calling the existing PostgreSQL function `create_token_partition`.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the database operation fails.
119    pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
120        let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
121            .bind(chain.chain_id as i32)
122            .fetch_one(&self.pool)
123            .await
124            .map_err(|e| {
125                anyhow::anyhow!(
126                    "Failed to call create_token_partition for chain {}: {e}",
127                    chain.chain_id
128                )
129            })?;
130
131        Ok(result.0)
132    }
133
134    /// Returns the highest block number that maintains data continuity in the database.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the database query fails.
139    pub async fn get_block_consistency_status(
140        &self,
141        chain: &Chain,
142    ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
143        tracing::info!("Fetching block consistency status");
144
145        let result: (i64, i64) = sqlx::query_as(
146            r"
147            SELECT
148                COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
149                get_last_continuous_block($1) as last_continuous_block
150            "
151        )
152        .bind(chain.chain_id as i32)
153        .fetch_one(&self.pool)
154        .await
155        .map_err(|e| {
156            anyhow::anyhow!(
157                "Failed to get block info for chain {}: {}",
158                chain.chain_id,
159                e
160            )
161        })?;
162
163        Ok(CachedBlocksConsistencyStatus::new(
164            result.0 as u64,
165            result.1 as u64,
166        ))
167    }
168
169    /// Inserts or updates a block record in the database.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the database operation fails.
174    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
175        sqlx::query(
176            r"
177            INSERT INTO block (
178                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
179                base_fee_per_gas, blob_gas_used, excess_blob_gas,
180                l1_gas_price, l1_gas_used, l1_fee_scalar
181            ) VALUES (
182                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
183            )
184            ON CONFLICT (chain_id, number)
185            DO UPDATE
186            SET
187                hash = $3,
188                parent_hash = $4,
189                miner = $5,
190                gas_limit = $6,
191                gas_used = $7,
192                timestamp = $8,
193                base_fee_per_gas = $9,
194                blob_gas_used = $10,
195                excess_blob_gas = $11,
196                l1_gas_price = $12,
197                l1_gas_used = $13,
198                l1_fee_scalar = $14
199        ",
200        )
201        .bind(chain_id as i32)
202        .bind(block.number as i64)
203        .bind(block.hash.as_str())
204        .bind(block.parent_hash.as_str())
205        .bind(block.miner.as_str())
206        .bind(block.gas_limit as i64)
207        .bind(block.gas_used as i64)
208        .bind(block.timestamp.to_string())
209        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
210        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
211        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
212        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
213        .bind(block.l1_gas_used.map(|v| v as i64))
214        .bind(block.l1_fee_scalar.map(|v| v as i64))
215        .execute(&self.pool)
216        .await
217        .map(|_| ())
218        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
219    }
220
221    /// Inserts multiple blocks in a single database operation using UNNEST for optimal performance.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the database operation fails.
226    pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
227        if blocks.is_empty() {
228            return Ok(());
229        }
230
231        // Prepare vectors for each column
232        let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
233        let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
234        let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
235        let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
236        let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
237        let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
238        let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
239        let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
240        let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241        let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242        let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243        let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
244        let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245
246        // Fill vectors from blocks
247        for block in blocks {
248            numbers.push(block.number as i64);
249            hashes.push(block.hash.clone());
250            parent_hashes.push(block.parent_hash.clone());
251            miners.push(block.miner.to_string());
252            gas_limits.push(block.gas_limit as i64);
253            gas_useds.push(block.gas_used as i64);
254            timestamps.push(block.timestamp.to_string());
255            base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
256            blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
257            excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
258            l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
259            l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
260            l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
261        }
262
263        // Execute batch insert with UNNEST
264        sqlx::query(
265            r"
266            INSERT INTO block (
267                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
268                base_fee_per_gas, blob_gas_used, excess_blob_gas,
269                l1_gas_price, l1_gas_used, l1_fee_scalar
270            )
271            SELECT
272                $1, *
273            FROM UNNEST(
274                $2::int8[], $3::text[], $4::text[], $5::text[],
275                $6::int8[], $7::int8[], $8::text[],
276                $9::text[], $10::text[], $11::text[],
277                $12::text[], $13::int8[], $14::int8[]
278            )
279            ON CONFLICT (chain_id, number) DO NOTHING
280           ",
281        )
282        .bind(chain_id as i32)
283        .bind(&numbers[..])
284        .bind(&hashes[..])
285        .bind(&parent_hashes[..])
286        .bind(&miners[..])
287        .bind(&gas_limits[..])
288        .bind(&gas_useds[..])
289        .bind(&timestamps[..])
290        .bind(&base_fee_per_gases as &[Option<String>])
291        .bind(&blob_gas_useds as &[Option<String>])
292        .bind(&excess_blob_gases as &[Option<String>])
293        .bind(&l1_gas_prices as &[Option<String>])
294        .bind(&l1_gas_useds as &[Option<i64>])
295        .bind(&l1_fee_scalars as &[Option<i64>])
296        .execute(&self.pool)
297        .await
298        .map(|_| ())
299        .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
300    }
301
302    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
303    ///
304    /// This method is significantly faster than INSERT for bulk operations as it bypasses
305    /// SQL parsing and uses PostgreSQL's native binary protocol.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the COPY operation fails.
310    pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
311        let copy_handler = PostgresCopyHandler::new(&self.pool);
312        copy_handler.copy_blocks(chain_id, blocks).await
313    }
314
315    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the COPY operation fails.
320    pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
321        let copy_handler = PostgresCopyHandler::new(&self.pool);
322        copy_handler.copy_tokens(chain_id, tokens).await
323    }
324
325    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the COPY operation fails.
330    pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
331        let copy_handler = PostgresCopyHandler::new(&self.pool);
332        copy_handler.copy_pools(chain_id, pools).await
333    }
334
335    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
336    ///
337    /// This method is significantly faster than INSERT for bulk operations as it bypasses
338    /// SQL parsing and uses PostgreSQL's native binary protocol.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the COPY operation fails.
343    pub async fn add_pool_swaps_copy(
344        &self,
345        chain_id: u32,
346        swaps: &[PoolSwap],
347    ) -> anyhow::Result<()> {
348        let copy_handler = PostgresCopyHandler::new(&self.pool);
349        copy_handler.copy_pool_swaps(chain_id, swaps).await
350    }
351
352    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
353    ///
354    /// This method is significantly faster than INSERT for bulk operations as it bypasses
355    /// SQL parsing and uses PostgreSQL's native binary protocol.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the COPY operation fails.
360    pub async fn add_pool_liquidity_updates_copy(
361        &self,
362        chain_id: u32,
363        updates: &[PoolLiquidityUpdate],
364    ) -> anyhow::Result<()> {
365        let copy_handler = PostgresCopyHandler::new(&self.pool);
366        copy_handler
367            .copy_pool_liquidity_updates(chain_id, updates)
368            .await
369    }
370
371    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
372    ///
373    /// This method is significantly faster than INSERT for bulk operations as it bypasses
374    /// SQL parsing and most database validation checks.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the COPY operation fails.
379    pub async fn copy_pool_fee_collects_batch(
380        &self,
381        chain_id: u32,
382        collects: &[PoolFeeCollect],
383    ) -> anyhow::Result<()> {
384        let copy_handler = PostgresCopyHandler::new(&self.pool);
385        copy_handler.copy_pool_collects(chain_id, collects).await
386    }
387
388    /// Retrieves block timestamps for a given chain starting from a specific block number.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the database query fails.
393    pub async fn load_block_timestamps(
394        &self,
395        chain: SharedChain,
396        from_block: u64,
397    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
398        sqlx::query_as::<_, BlockTimestampRow>(
399            r"
400            SELECT
401                number,
402                timestamp
403            FROM block
404            WHERE chain_id = $1 AND number >= $2
405            ORDER BY number ASC
406            ",
407        )
408        .bind(chain.chain_id as i32)
409        .bind(from_block as i64)
410        .fetch_all(&self.pool)
411        .await
412        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
413    }
414
415    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the database operation fails.
420    pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
421        sqlx::query(
422            r"
423            INSERT INTO dex (
424                chain_id, name, factory_address, creation_block
425            ) VALUES ($1, $2, $3, $4)
426            ON CONFLICT (chain_id, name)
427            DO UPDATE
428            SET
429                factory_address = $3,
430                creation_block = $4
431        ",
432        )
433        .bind(dex.chain.chain_id as i32)
434        .bind(dex.name.to_string())
435        .bind(dex.factory.to_string())
436        .bind(dex.factory_creation_block as i64)
437        .execute(&self.pool)
438        .await
439        .map(|_| ())
440        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
441    }
442
443    /// Adds or updates a liquidity pool/pair record in the database.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if the database operation fails.
448    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
449        sqlx::query(
450            r"
451            INSERT INTO pool (
452                chain_id, address, dex_name, creation_block,
453                token0_chain, token0_address,
454                token1_chain, token1_address,
455                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
456            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
457            ON CONFLICT (chain_id, address)
458            DO UPDATE
459            SET
460                dex_name = $3,
461                creation_block = $4,
462                token0_chain = $5,
463                token0_address = $6,
464                token1_chain = $7,
465                token1_address = $8,
466                fee = $9,
467                tick_spacing = $10,
468                initial_tick = $11,
469                initial_sqrt_price_x96 = $12
470        ",
471        )
472        .bind(pool.chain.chain_id as i32)
473        .bind(pool.address.to_string())
474        .bind(pool.dex.name.to_string())
475        .bind(pool.creation_block as i64)
476        .bind(pool.token0.chain.chain_id as i32)
477        .bind(pool.token0.address.to_string())
478        .bind(pool.token1.chain.chain_id as i32)
479        .bind(pool.token1.address.to_string())
480        .bind(pool.fee.map(|fee| fee as i32))
481        .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
482        .bind(pool.initial_tick)
483        .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
484        .execute(&self.pool)
485        .await
486        .map(|_| ())
487        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
488    }
489
490    /// Inserts multiple pools in a single database operation using UNNEST for optimal performance.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if the database operation fails.
495    pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
496        if pools.is_empty() {
497            return Ok(());
498        }
499
500        // Prepare vectors for each column
501        let len = pools.len();
502        let mut addresses: Vec<String> = Vec::with_capacity(len);
503        let mut dex_names: Vec<String> = Vec::with_capacity(len);
504        let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
505        let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
506        let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
507        let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
508        let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
509        let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
510        let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
511        let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
512        let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
513        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
514
515        // Fill vectors from pools
516        for pool in pools {
517            chain_ids.push(pool.chain.chain_id as i32);
518            addresses.push(pool.address.to_string());
519            dex_names.push(pool.dex.name.to_string());
520            creation_blocks.push(pool.creation_block as i64);
521            token0_chains.push(pool.token0.chain.chain_id as i32);
522            token0_addresses.push(pool.token0.address.to_string());
523            token1_chains.push(pool.token1.chain.chain_id as i32);
524            token1_addresses.push(pool.token1.address.to_string());
525            fees.push(pool.fee.map(|fee| fee as i32));
526            tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
527            initial_ticks.push(pool.initial_tick);
528            initial_sqrt_price_x96s
529                .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
530        }
531
532        // Execute batch insert with UNNEST
533        sqlx::query(
534            r"
535            INSERT INTO pool (
536                chain_id, address, dex_name, creation_block,
537                token0_chain, token0_address,
538                token1_chain, token1_address,
539                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
540            )
541            SELECT *
542            FROM UNNEST(
543                $1::int4[], $2::text[], $3::text[], $4::int8[],
544                $5::int4[], $6::text[], $7::int4[], $8::text[],
545                $9::int4[], $10::int4[], $11::int4[], $12::text[]
546            )
547            ON CONFLICT (chain_id, address) DO NOTHING
548           ",
549        )
550        .bind(&chain_ids[..])
551        .bind(&addresses[..])
552        .bind(&dex_names[..])
553        .bind(&creation_blocks[..])
554        .bind(&token0_chains[..])
555        .bind(&token0_addresses[..])
556        .bind(&token1_chains[..])
557        .bind(&token1_addresses[..])
558        .bind(&fees[..])
559        .bind(&tick_spacings[..])
560        .bind(&initial_ticks[..])
561        .bind(&initial_sqrt_price_x96s[..])
562        .execute(&self.pool)
563        .await
564        .map(|_| ())
565        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
566    }
567
568    /// Inserts multiple pool swaps in a single database operation using UNNEST for optimal performance.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the database operation fails.
573    pub async fn add_pool_swaps_batch(
574        &self,
575        chain_id: u32,
576        swaps: &[PoolSwap],
577    ) -> anyhow::Result<()> {
578        if swaps.is_empty() {
579            return Ok(());
580        }
581
582        // Prepare vectors for each column
583        let len = swaps.len();
584        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
585        let mut blocks: Vec<i64> = Vec::with_capacity(len);
586        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
587        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
588        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
589        let mut senders: Vec<String> = Vec::with_capacity(len);
590        let mut recipients: Vec<String> = Vec::with_capacity(len);
591        let mut sides: Vec<Option<String>> = Vec::with_capacity(len);
592        let mut sizes: Vec<Option<String>> = Vec::with_capacity(len);
593        let mut prices: Vec<Option<String>> = Vec::with_capacity(len);
594        let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
595        let mut liquidities: Vec<String> = Vec::with_capacity(len);
596        let mut ticks: Vec<i32> = Vec::with_capacity(len);
597        let mut amount0s: Vec<String> = Vec::with_capacity(len);
598        let mut amount1s: Vec<String> = Vec::with_capacity(len);
599        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
600
601        // Fill vectors from swaps
602        for swap in swaps {
603            chain_ids.push(chain_id as i32);
604            pool_addresses.push(swap.pool_address.to_string());
605            blocks.push(swap.block as i64);
606            transaction_hashes.push(swap.transaction_hash.clone());
607            transaction_indices.push(swap.transaction_index as i32);
608            log_indices.push(swap.log_index as i32);
609            senders.push(swap.sender.to_string());
610            recipients.push(swap.recipient.to_string());
611            sides.push(swap.side.map(|side| side.to_string()));
612            sizes.push(swap.size.map(|size| size.to_string()));
613            prices.push(swap.price.map(|price| price.to_string()));
614            sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
615            liquidities.push(swap.liquidity.to_string());
616            ticks.push(swap.tick);
617            amount0s.push(swap.amount0.to_string());
618            amount1s.push(swap.amount1.to_string());
619        }
620
621        // Execute batch insert with UNNEST
622        sqlx::query(
623            r"
624            INSERT INTO pool_swap_event (
625                chain_id, pool_address, block, transaction_hash, transaction_index,
626                log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1
627            )
628            SELECT
629                chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
630                side, size, price, sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256
631            FROM UNNEST(
632                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[], $6::INT[],
633                $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
634                $12::TEXT[], $13::TEXT[], $14::INT[], $15::TEXT[], $16::TEXT[]
635            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
636                   log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1)
637            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
638           ",
639        )
640        .bind(&chain_ids[..])
641        .bind(&pool_addresses[..])
642        .bind(&blocks[..])
643        .bind(&transaction_hashes[..])
644        .bind(&transaction_indices[..])
645        .bind(&log_indices[..])
646        .bind(&senders[..])
647        .bind(&recipients[..])
648        .bind(&sides[..])
649        .bind(&sizes[..])
650        .bind(&prices[..])
651        .bind(&sqrt_price_x96s[..])
652        .bind(&liquidities[..])
653        .bind(&ticks[..])
654        .bind(&amount0s[..])
655        .bind(&amount1s[..])
656        .execute(&self.pool)
657        .await
658        .map(|_| ())
659        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
660    }
661
662    /// Inserts multiple pool liquidity updates in a single database operation using UNNEST for optimal performance.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the database operation fails.
667    pub async fn add_pool_liquidity_updates_batch(
668        &self,
669        chain_id: u32,
670        updates: &[PoolLiquidityUpdate],
671    ) -> anyhow::Result<()> {
672        if updates.is_empty() {
673            return Ok(());
674        }
675
676        // Prepare vectors for each column
677        let len = updates.len();
678        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
679        let mut blocks: Vec<i64> = Vec::with_capacity(len);
680        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
681        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
682        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
683        let mut event_types: Vec<String> = Vec::with_capacity(len);
684        let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
685        let mut owners: Vec<String> = Vec::with_capacity(len);
686        let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
687        let mut amount0s: Vec<String> = Vec::with_capacity(len);
688        let mut amount1s: Vec<String> = Vec::with_capacity(len);
689        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
690        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
691        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
692
693        // Fill vectors from updates
694        for update in updates {
695            chain_ids.push(chain_id as i32);
696            pool_addresses.push(update.pool_address.to_string());
697            blocks.push(update.block as i64);
698            transaction_hashes.push(update.transaction_hash.clone());
699            transaction_indices.push(update.transaction_index as i32);
700            log_indices.push(update.log_index as i32);
701            event_types.push(update.kind.to_string());
702            senders.push(update.sender.map(|s| s.to_string()));
703            owners.push(update.owner.to_string());
704            position_liquidities.push(update.position_liquidity.to_string());
705            amount0s.push(update.amount0.to_string());
706            amount1s.push(update.amount1.to_string());
707            tick_lowers.push(update.tick_lower);
708            tick_uppers.push(update.tick_upper);
709        }
710
711        // Execute batch insert with UNNEST
712        sqlx::query(
713            r"
714            INSERT INTO pool_liquidity_event (
715                chain_id, pool_address, block, transaction_hash, transaction_index,
716                log_index, event_type, sender, owner, position_liquidity,
717                amount0, amount1, tick_lower, tick_upper
718            )
719            SELECT
720                chain_id, pool_address, block, transaction_hash, transaction_index,
721                log_index, event_type, sender, owner, position_liquidity::u128,
722                amount0::U256, amount1::U256, tick_lower, tick_upper
723            FROM UNNEST(
724                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
725                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
726                $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
727            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
728                   log_index, event_type, sender, owner, position_liquidity,
729                   amount0, amount1, tick_lower, tick_upper)
730            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
731           ",
732        )
733        .bind(&chain_ids[..])
734        .bind(&pool_addresses[..])
735        .bind(&blocks[..])
736        .bind(&transaction_hashes[..])
737        .bind(&transaction_indices[..])
738        .bind(&log_indices[..])
739        .bind(&event_types[..])
740        .bind(&senders[..])
741        .bind(&owners[..])
742        .bind(&position_liquidities[..])
743        .bind(&amount0s[..])
744        .bind(&amount1s[..])
745        .bind(&tick_lowers[..])
746        .bind(&tick_uppers[..])
747        .execute(&self.pool)
748        .await
749        .map(|_| ())
750        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
751    }
752
753    /// Adds or updates a token record in the database.
754    ///
755    /// # Errors
756    ///
757    /// Returns an error if the database operation fails.
758    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
759        sqlx::query(
760            r"
761            INSERT INTO token (
762                chain_id, address, name, symbol, decimals
763            ) VALUES ($1, $2, $3, $4, $5)
764            ON CONFLICT (chain_id, address)
765            DO UPDATE
766            SET
767                name = $3,
768                symbol = $4,
769                decimals = $5
770        ",
771        )
772        .bind(token.chain.chain_id as i32)
773        .bind(token.address.to_string())
774        .bind(token.name.as_str())
775        .bind(token.symbol.as_str())
776        .bind(i32::from(token.decimals))
777        .execute(&self.pool)
778        .await
779        .map(|_| ())
780        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
781    }
782
783    /// Records an invalid token address with associated error information.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the database insertion fails.
788    pub async fn add_invalid_token(
789        &self,
790        chain_id: u32,
791        address: &Address,
792        error_string: &str,
793    ) -> anyhow::Result<()> {
794        sqlx::query(
795            r"
796            INSERT INTO token (
797                chain_id, address, error
798            ) VALUES ($1, $2, $3)
799            ON CONFLICT (chain_id, address)
800            DO NOTHING;
801        ",
802        )
803        .bind(chain_id as i32)
804        .bind(address.to_string())
805        .bind(error_string)
806        .execute(&self.pool)
807        .await
808        .map(|_| ())
809        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
810    }
811
812    /// Persists a token swap transaction event to the `pool_swap` table.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if the database operation fails.
817    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
818        sqlx::query(
819            r"
820            INSERT INTO pool_swap_event (
821                chain_id, pool_address, block, transaction_hash, transaction_index,
822                log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
823            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
824            ON CONFLICT (chain_id, transaction_hash, log_index)
825            DO NOTHING
826        ",
827        )
828        .bind(chain_id as i32)
829        .bind(swap.pool_address.to_string())
830        .bind(swap.block as i64)
831        .bind(swap.transaction_hash.as_str())
832        .bind(swap.transaction_index as i32)
833        .bind(swap.log_index as i32)
834        .bind(swap.sender.to_string())
835        .bind(swap.recipient.to_string())
836        .bind(swap.side.map(|side| side.to_string()))
837        .bind(swap.size.map(|size| size.to_string()))
838        .bind(swap.price.map(|price| price.to_string()))
839        .bind(swap.sqrt_price_x96.to_string())
840        .bind(swap.amount0.to_string())
841        .bind(swap.amount1.to_string())
842        .execute(&self.pool)
843        .await
844        .map(|_| ())
845        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
846    }
847
848    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if the database operation fails.
853    pub async fn add_pool_liquidity_update(
854        &self,
855        chain_id: u32,
856        liquidity_update: &PoolLiquidityUpdate,
857    ) -> anyhow::Result<()> {
858        sqlx::query(
859            r"
860            INSERT INTO pool_liquidity_event (
861                chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
862                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
863            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
864            ON CONFLICT (chain_id, transaction_hash, log_index)
865            DO NOTHING
866        ",
867        )
868        .bind(chain_id as i32)
869        .bind(liquidity_update.pool_address.to_string())
870        .bind(liquidity_update.block as i64)
871        .bind(liquidity_update.transaction_hash.as_str())
872        .bind(liquidity_update.transaction_index as i32)
873        .bind(liquidity_update.log_index as i32)
874        .bind(liquidity_update.kind.to_string())
875        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
876        .bind(liquidity_update.owner.to_string())
877        .bind(U128Pg(liquidity_update.position_liquidity))
878        .bind(U256Pg(liquidity_update.amount0))
879        .bind(U256Pg(liquidity_update.amount1))
880        .bind(liquidity_update.tick_lower)
881        .bind(liquidity_update.tick_upper)
882        .execute(&self.pool)
883        .await
884        .map(|_| ())
885        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
886    }
887
888    /// Retrieves all valid token records for the given chain and converts them into `Token` domain objects.
889    ///
890    /// Only returns tokens that do not contain error information, filtering out invalid tokens
891    /// that were previously recorded with error details.
892    ///
893    /// # Errors
894    ///
895    /// Returns an error if the database query fails.
896    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
897        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
898            .bind(chain.chain_id as i32)
899            .fetch_all(&self.pool)
900            .await
901            .map(|rows| {
902                rows.into_iter()
903                    .map(|token_row| {
904                        Token::new(
905                            chain.clone(),
906                            token_row.address,
907                            token_row.name,
908                            token_row.symbol,
909                            token_row.decimals as u8,
910                        )
911                    })
912                    .collect::<Vec<_>>()
913            })
914            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
915    }
916
917    /// Retrieves all invalid token addresses for a given chain.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the database query fails or address validation fails.
922    pub async fn load_invalid_token_addresses(
923        &self,
924        chain_id: u32,
925    ) -> anyhow::Result<Vec<Address>> {
926        sqlx::query_as::<_, (String,)>(
927            "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
928        )
929        .bind(chain_id as i32)
930        .fetch_all(&self.pool)
931        .await?
932        .into_iter()
933        .map(|(address,)| validate_address(&address))
934        .collect::<Result<Vec<_>, _>>()
935        .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
936    }
937
938    /// Loads pool data from the database for the specified chain and DEX.
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if the database query fails, the connection to the database is lost, or the query parameters are invalid.
943    pub async fn load_pools(
944        &self,
945        chain: SharedChain,
946        dex_id: &str,
947    ) -> anyhow::Result<Vec<PoolRow>> {
948        sqlx::query_as::<_, PoolRow>(
949            r"
950            SELECT
951                address,
952                dex_name,
953                creation_block,
954                token0_chain,
955                token0_address,
956                token1_chain,
957                token1_address,
958                fee,
959                tick_spacing,
960                initial_tick,
961                initial_sqrt_price_x96
962            FROM pool
963            WHERE chain_id = $1 AND dex_name = $2
964            ORDER BY creation_block ASC
965        ",
966        )
967        .bind(chain.chain_id as i32)
968        .bind(dex_id)
969        .fetch_all(&self.pool)
970        .await
971        .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
972    }
973
974    /// Toggles performance optimization settings for sync operations.
975    ///
976    /// When enabled (true), applies settings for maximum write performance:
977    /// - `synchronous_commit` = OFF
978    /// - `work_mem` increased for bulk operations
979    ///
980    /// When disabled (false), restores default safe settings:
981    /// - `synchronous_commit` = ON (data safety)
982    /// - `work_mem` back to default
983    ///
984    /// # Errors
985    ///
986    /// Returns an error if the database operations fail.
987    pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
988        if enable {
989            tracing::info!("Enabling performance sync settings for bulk operations");
990
991            // Set synchronous_commit to OFF for maximum write performance
992            sqlx::query("SET synchronous_commit = OFF")
993                .execute(&self.pool)
994                .await
995                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
996
997            // Increase work_mem for bulk operations
998            sqlx::query("SET work_mem = '256MB'")
999                .execute(&self.pool)
1000                .await
1001                .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1002
1003            tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1004        } else {
1005            tracing::info!("Restoring default safe database performance settings");
1006
1007            // Restore synchronous_commit to ON for data safety
1008            sqlx::query("SET synchronous_commit = ON")
1009                .execute(&self.pool)
1010                .await
1011                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1012
1013            // Reset work_mem to default
1014            sqlx::query("RESET work_mem")
1015                .execute(&self.pool)
1016                .await
1017                .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if the database operation fails.
1028    pub async fn update_dex_last_synced_block(
1029        &self,
1030        chain_id: u32,
1031        dex: &DexType,
1032        block_number: u64,
1033    ) -> anyhow::Result<()> {
1034        sqlx::query(
1035            r"
1036            UPDATE dex
1037            SET last_full_sync_pools_block_number = $3
1038            WHERE chain_id = $1 AND name = $2
1039            ",
1040        )
1041        .bind(chain_id as i32)
1042        .bind(dex.to_string())
1043        .bind(block_number as i64)
1044        .execute(&self.pool)
1045        .await
1046        .map(|_| ())
1047        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1048    }
1049
1050    pub async fn update_pool_last_synced_block(
1051        &self,
1052        chain_id: u32,
1053        dex: &DexType,
1054        pool_address: &Address,
1055        block_number: u64,
1056    ) -> anyhow::Result<()> {
1057        sqlx::query(
1058            r"
1059            UPDATE pool
1060            SET last_full_sync_block_number = $4
1061            WHERE chain_id = $1
1062            AND dex_name = $2
1063            AND address = $3
1064            ",
1065        )
1066        .bind(chain_id as i32)
1067        .bind(dex.to_string())
1068        .bind(pool_address.to_string())
1069        .bind(block_number as i64)
1070        .execute(&self.pool)
1071        .await
1072        .map(|_| ())
1073        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1074    }
1075
1076    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the database query fails.
1081    pub async fn get_dex_last_synced_block(
1082        &self,
1083        chain_id: u32,
1084        dex: &DexType,
1085    ) -> anyhow::Result<Option<u64>> {
1086        let result = sqlx::query_as::<_, (Option<i64>,)>(
1087            r#"
1088            SELECT
1089                last_full_sync_pools_block_number
1090            FROM dex
1091            WHERE chain_id = $1
1092            AND name = $2
1093            "#,
1094        )
1095        .bind(chain_id as i32)
1096        .bind(dex.to_string())
1097        .fetch_optional(&self.pool)
1098        .await
1099        .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1100
1101        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1102    }
1103
1104    pub async fn get_pool_last_synced_block(
1105        &self,
1106        chain_id: u32,
1107        dex: &DexType,
1108        pool_address: &Address,
1109    ) -> anyhow::Result<Option<u64>> {
1110        let result = sqlx::query_as::<_, (Option<i64>,)>(
1111            r#"
1112            SELECT
1113                last_full_sync_block_number
1114            FROM pool
1115            WHERE chain_id = $1
1116            AND dex_name = $2
1117            AND address = $3
1118            "#,
1119        )
1120        .bind(chain_id as i32)
1121        .bind(dex.to_string())
1122        .bind(pool_address.to_string())
1123        .fetch_optional(&self.pool)
1124        .await
1125        .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1126
1127        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1128    }
1129
1130    /// Retrieves the maximum block number from a specific table for a given pool.
1131    /// This is useful to detect orphaned data where events were inserted but progress wasn't updated.
1132    ///
1133    /// # Errors
1134    ///
1135    /// Returns an error if the database query fails.
1136    pub async fn get_table_last_block(
1137        &self,
1138        chain_id: u32,
1139        table_name: &str,
1140        pool_address: &Address,
1141    ) -> anyhow::Result<Option<u64>> {
1142        let query = format!(
1143            "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1144            table_name
1145        );
1146        let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1147            .bind(chain_id as i32)
1148            .bind(pool_address.to_string())
1149            .fetch_optional(&self.pool)
1150            .await
1151            .map_err(|e| {
1152                anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1153            })?;
1154
1155        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1156    }
1157
1158    /// Adds a batch of pool fee collect events to the database using batch operations.
1159    ///
1160    /// # Errors
1161    ///
1162    /// Returns an error if the database operation fails.
1163    pub async fn add_pool_collects_batch(
1164        &self,
1165        chain_id: u32,
1166        collects: &[PoolFeeCollect],
1167    ) -> anyhow::Result<()> {
1168        if collects.is_empty() {
1169            return Ok(());
1170        }
1171
1172        // Prepare vectors for each column
1173        let len = collects.len();
1174        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1175        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1176        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1177        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1178        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1179        let mut owners: Vec<String> = Vec::with_capacity(len);
1180        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1181        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1182        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1183        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1184        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1185
1186        // Fill vectors from collects
1187        for collect in collects {
1188            chain_ids.push(chain_id as i32);
1189            pool_addresses.push(collect.pool_address.to_string());
1190            blocks.push(collect.block as i64);
1191            transaction_hashes.push(collect.transaction_hash.clone());
1192            transaction_indices.push(collect.transaction_index as i32);
1193            log_indices.push(collect.log_index as i32);
1194            owners.push(collect.owner.to_string());
1195            amount0s.push(collect.amount0.to_string());
1196            amount1s.push(collect.amount1.to_string());
1197            tick_lowers.push(collect.tick_lower);
1198            tick_uppers.push(collect.tick_upper);
1199        }
1200
1201        // Execute batch insert with UNNEST
1202        sqlx::query(
1203            r"
1204            INSERT INTO pool_collect_event (
1205                chain_id, pool_address, block, transaction_hash, transaction_index,
1206                log_index, owner, amount0, amount1, tick_lower, tick_upper
1207            )
1208            SELECT
1209                chain_id, pool_address, block, transaction_hash, transaction_index,
1210                log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1211            FROM UNNEST(
1212                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1213                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1214            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1215                   log_index, owner, amount0, amount1, tick_lower, tick_upper)
1216            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1217           ",
1218        )
1219        .bind(&chain_ids[..])
1220        .bind(&pool_addresses[..])
1221        .bind(&blocks[..])
1222        .bind(&transaction_hashes[..])
1223        .bind(&transaction_indices[..])
1224        .bind(&log_indices[..])
1225        .bind(&owners[..])
1226        .bind(&amount0s[..])
1227        .bind(&amount1s[..])
1228        .bind(&tick_lowers[..])
1229        .bind(&tick_uppers[..])
1230        .execute(&self.pool)
1231        .await
1232        .map(|_| ())
1233        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1234    }
1235
1236    /// Inserts multiple pool flash events in a single database operation using UNNEST for optimal performance.
1237    ///
1238    /// # Errors
1239    ///
1240    /// Returns an error if the database operation fails.
1241    pub async fn add_pool_flash_batch(
1242        &self,
1243        chain_id: u32,
1244        flash_events: &[PoolFlash],
1245    ) -> anyhow::Result<()> {
1246        if flash_events.is_empty() {
1247            return Ok(());
1248        }
1249
1250        // Prepare vectors for each column
1251        let len = flash_events.len();
1252        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1253        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1254        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1255        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1256        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1257        let mut senders: Vec<String> = Vec::with_capacity(len);
1258        let mut recipients: Vec<String> = Vec::with_capacity(len);
1259        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1260        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1261        let mut paid0s: Vec<String> = Vec::with_capacity(len);
1262        let mut paid1s: Vec<String> = Vec::with_capacity(len);
1263        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1264
1265        // Fill vectors from flash events
1266        for flash in flash_events {
1267            chain_ids.push(chain_id as i32);
1268            pool_addresses.push(flash.pool_address.to_string());
1269            blocks.push(flash.block as i64);
1270            transaction_hashes.push(flash.transaction_hash.clone());
1271            transaction_indices.push(flash.transaction_index as i32);
1272            log_indices.push(flash.log_index as i32);
1273            senders.push(flash.sender.to_string());
1274            recipients.push(flash.recipient.to_string());
1275            amount0s.push(flash.amount0.to_string());
1276            amount1s.push(flash.amount1.to_string());
1277            paid0s.push(flash.paid0.to_string());
1278            paid1s.push(flash.paid1.to_string());
1279        }
1280
1281        // Execute batch insert with UNNEST
1282        sqlx::query(
1283            r"
1284            INSERT INTO pool_flash_event (
1285                chain_id, pool_address, block, transaction_hash, transaction_index,
1286                log_index, sender, recipient, amount0, amount1, paid0, paid1
1287            )
1288            SELECT
1289                chain_id, pool_address, block, transaction_hash, transaction_index,
1290                log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1291            FROM UNNEST(
1292                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1293                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1294            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1295                   log_index, sender, recipient, amount0, amount1, paid0, paid1)
1296            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1297           ",
1298        )
1299        .bind(&chain_ids[..])
1300        .bind(&pool_addresses[..])
1301        .bind(&blocks[..])
1302        .bind(&transaction_hashes[..])
1303        .bind(&transaction_indices[..])
1304        .bind(&log_indices[..])
1305        .bind(&senders[..])
1306        .bind(&recipients[..])
1307        .bind(&amount0s[..])
1308        .bind(&amount1s[..])
1309        .bind(&paid0s[..])
1310        .bind(&paid1s[..])
1311        .execute(&self.pool)
1312        .await
1313        .map(|_| ())
1314        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1315    }
1316
1317    pub async fn add_pool_snapshot(
1318        &self,
1319        chain_id: u32,
1320        pool_address: &Address,
1321        snapshot: &PoolSnapshot,
1322    ) -> anyhow::Result<()> {
1323        sqlx::query(
1324            r"
1325            INSERT INTO pool_snapshot (
1326                chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1327                current_tick, price_sqrt_ratio_x96, liquidity,
1328                protocol_fees_token0, protocol_fees_token1, fee_protocol,
1329                fee_growth_global_0, fee_growth_global_1,
1330                total_amount0_deposited, total_amount1_deposited,
1331                total_amount0_collected, total_amount1_collected,
1332                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1333                liquidity_utilization_rate
1334            ) VALUES (
1335                $1, $2, $3, $4, $5, $6,
1336                $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1337                $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1338                $19, $20, $21, $22, $23, $24
1339            )
1340            ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1341            DO NOTHING
1342            ",
1343        )
1344        .bind(chain_id as i32)
1345        .bind(pool_address.to_string())
1346        .bind(snapshot.block_position.number as i64)
1347        .bind(snapshot.block_position.transaction_index as i32)
1348        .bind(snapshot.block_position.log_index as i32)
1349        .bind(snapshot.block_position.transaction_hash.to_string())
1350        .bind(snapshot.state.current_tick)
1351        .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1352        .bind(snapshot.state.liquidity.to_string())
1353        .bind(snapshot.state.protocol_fees_token0.to_string())
1354        .bind(snapshot.state.protocol_fees_token1.to_string())
1355        .bind(snapshot.state.fee_protocol as i16)
1356        .bind(snapshot.state.fee_growth_global_0.to_string())
1357        .bind(snapshot.state.fee_growth_global_1.to_string())
1358        .bind(snapshot.analytics.total_amount0_deposited.to_string())
1359        .bind(snapshot.analytics.total_amount1_deposited.to_string())
1360        .bind(snapshot.analytics.total_amount0_collected.to_string())
1361        .bind(snapshot.analytics.total_amount1_collected.to_string())
1362        .bind(snapshot.analytics.total_swaps as i32)
1363        .bind(snapshot.analytics.total_mints as i32)
1364        .bind(snapshot.analytics.total_burns as i32)
1365        .bind(snapshot.analytics.total_fee_collects as i32)
1366        .bind(snapshot.analytics.total_flashes as i32)
1367        .bind(snapshot.analytics.liquidity_utilization_rate)
1368        .execute(&self.pool)
1369        .await
1370        .map(|_| ())
1371        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1372    }
1373
1374    /// Inserts multiple pool positions in a single database operation using UNNEST for optimal performance.
1375    ///
1376    /// # Errors
1377    ///
1378    /// Returns an error if the database operation fails.
1379    pub async fn add_pool_positions_batch(
1380        &self,
1381        chain_id: u32,
1382        snapshot_block: u64,
1383        snapshot_transaction_index: u32,
1384        snapshot_log_index: u32,
1385        positions: &[(Address, PoolPosition)],
1386    ) -> anyhow::Result<()> {
1387        if positions.is_empty() {
1388            return Ok(());
1389        }
1390
1391        // Prepare vectors for each column
1392        let len = positions.len();
1393        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1394        let mut owners: Vec<String> = Vec::with_capacity(len);
1395        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1396        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1397        let mut liquidities: Vec<String> = Vec::with_capacity(len);
1398        let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1399        let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1400        let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1401        let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1402        let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1403        let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1404        let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1405        let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1406
1407        // Fill vectors from positions
1408        for (pool_address, position) in positions {
1409            pool_addresses.push(pool_address.to_string());
1410            owners.push(position.owner.to_string());
1411            tick_lowers.push(position.tick_lower);
1412            tick_uppers.push(position.tick_upper);
1413            liquidities.push(position.liquidity.to_string());
1414            fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1415            fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1416            tokens_owed_0s.push(position.tokens_owed_0.to_string());
1417            tokens_owed_1s.push(position.tokens_owed_1.to_string());
1418            total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1419            total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1420            total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1421            total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1422        }
1423
1424        // Execute batch insert with UNNEST
1425        sqlx::query(
1426            r"
1427            INSERT INTO pool_position (
1428                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1429                owner, tick_lower, tick_upper,
1430                liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1431                tokens_owed_0, tokens_owed_1,
1432                total_amount0_deposited, total_amount1_deposited,
1433                total_amount0_collected, total_amount1_collected
1434            )
1435            SELECT
1436                $1, pool_address, $2, $3, $4,
1437                owner, tick_lower, tick_upper,
1438                liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1439                tokens_owed_0::U128, tokens_owed_1::U128,
1440                total_amount0_deposited::U256, total_amount1_deposited::U256,
1441                total_amount0_collected::U128, total_amount1_collected::U128
1442            FROM UNNEST(
1443                $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1444                $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1445                $16::TEXT[], $17::TEXT[]
1446            ) AS t(pool_address, owner, tick_lower, tick_upper,
1447                   liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1448                   tokens_owed_0, tokens_owed_1,
1449                   total_amount0_deposited, total_amount1_deposited,
1450                   total_amount0_collected, total_amount1_collected)
1451            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1452            DO NOTHING
1453           ",
1454        )
1455        .bind(chain_id as i32)
1456        .bind(snapshot_block as i64)
1457        .bind(snapshot_transaction_index as i32)
1458        .bind(snapshot_log_index as i32)
1459        .bind(&pool_addresses[..])
1460        .bind(&owners[..])
1461        .bind(&tick_lowers[..])
1462        .bind(&tick_uppers[..])
1463        .bind(&liquidities[..])
1464        .bind(&fee_growth_inside_0_lasts[..])
1465        .bind(&fee_growth_inside_1_lasts[..])
1466        .bind(&tokens_owed_0s[..])
1467        .bind(&tokens_owed_1s[..])
1468        .bind(&total_amount0_depositeds as &[Option<String>])
1469        .bind(&total_amount1_depositeds as &[Option<String>])
1470        .bind(&total_amount0_collecteds as &[Option<String>])
1471        .bind(&total_amount1_collecteds as &[Option<String>])
1472        .execute(&self.pool)
1473        .await
1474        .map(|_| ())
1475        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1476    }
1477
1478    /// Inserts multiple pool ticks in a single database operation using UNNEST for optimal performance.
1479    ///
1480    /// # Errors
1481    ///
1482    /// Returns an error if the database operation fails.
1483    pub async fn add_pool_ticks_batch(
1484        &self,
1485        chain_id: u32,
1486        snapshot_block: u64,
1487        snapshot_transaction_index: u32,
1488        snapshot_log_index: u32,
1489        ticks: &[(Address, &PoolTick)],
1490    ) -> anyhow::Result<()> {
1491        if ticks.is_empty() {
1492            return Ok(());
1493        }
1494
1495        // Prepare vectors for each column
1496        let len = ticks.len();
1497        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1498        let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1499        let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1500        let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1501        let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1502        let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1503        let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1504        let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1505
1506        // Fill vectors from ticks
1507        for (pool_address, tick) in ticks {
1508            pool_addresses.push(pool_address.to_string());
1509            tick_values.push(tick.value);
1510            liquidity_grosses.push(tick.liquidity_gross.to_string());
1511            liquidity_nets.push(tick.liquidity_net.to_string());
1512            fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1513            fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1514            initializeds.push(tick.initialized);
1515            last_updated_blocks.push(tick.last_updated_block as i64);
1516        }
1517
1518        // Execute batch insert with UNNEST
1519        sqlx::query(
1520            r"
1521            INSERT INTO pool_tick (
1522                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1523                tick_value, liquidity_gross, liquidity_net,
1524                fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1525            )
1526            SELECT
1527                $1, pool_address, $2, $3, $4,
1528                tick_value, liquidity_gross::U128, liquidity_net::I128,
1529                fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1530            FROM UNNEST(
1531                $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1532                $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1533            ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1534                   fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1535            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1536            DO NOTHING
1537           ",
1538        )
1539        .bind(chain_id as i32)
1540        .bind(snapshot_block as i64)
1541        .bind(snapshot_transaction_index as i32)
1542        .bind(snapshot_log_index as i32)
1543        .bind(&pool_addresses[..])
1544        .bind(&tick_values[..])
1545        .bind(&liquidity_grosses[..])
1546        .bind(&liquidity_nets[..])
1547        .bind(&fee_growth_outside_0s[..])
1548        .bind(&fee_growth_outside_1s[..])
1549        .bind(&initializeds[..])
1550        .bind(&last_updated_blocks[..])
1551        .execute(&self.pool)
1552        .await
1553        .map(|_| ())
1554        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1555    }
1556
1557    pub async fn update_pool_initial_price_tick(
1558        &self,
1559        chain_id: u32,
1560        initialize_event: &InitializeEvent,
1561    ) -> anyhow::Result<()> {
1562        sqlx::query(
1563            r"
1564            UPDATE pool
1565            SET
1566                initial_tick = $4,
1567                initial_sqrt_price_x96 = $5
1568            WHERE chain_id = $1
1569            AND dex_name = $2
1570            AND address = $3
1571            ",
1572        )
1573        .bind(chain_id as i32)
1574        .bind(initialize_event.dex.name.to_string())
1575        .bind(initialize_event.pool_address.to_string())
1576        .bind(initialize_event.tick)
1577        .bind(initialize_event.sqrt_price_x96.to_string())
1578        .execute(&self.pool)
1579        .await
1580        .map(|_| ())
1581        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1582    }
1583
1584    /// Loads the latest valid pool snapshot from the database.
1585    ///
1586    /// Returns the most recent snapshot that has been validated against on-chain state.
1587    ///
1588    /// # Errors
1589    ///
1590    /// Returns an error if the database query fails.
1591    pub async fn load_latest_valid_pool_snapshot(
1592        &self,
1593        chain_id: u32,
1594        pool_address: &Address,
1595    ) -> anyhow::Result<Option<PoolSnapshot>> {
1596        let result = sqlx::query(
1597            r"
1598            SELECT
1599                block, transaction_index, log_index, transaction_hash,
1600                current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1601                protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1602                fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1603                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1604                total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1605                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1606                liquidity_utilization_rate,
1607                (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1608            FROM pool_snapshot
1609            WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1610            ORDER BY block DESC, transaction_index DESC, log_index DESC
1611            LIMIT 1
1612            ",
1613        )
1614        .bind(chain_id as i32)
1615        .bind(pool_address.to_string())
1616        .fetch_optional(&self.pool)
1617        .await
1618        .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1619
1620        if let Some(row) = result {
1621            // Parse snapshot state
1622            let block: i64 = row.get("block");
1623            let transaction_index: i32 = row.get("transaction_index");
1624            let log_index: i32 = row.get("log_index");
1625            let transaction_hash: String = row.get("transaction_hash");
1626
1627            let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1628                block as u64,
1629                transaction_hash,
1630                transaction_index as u32,
1631                log_index as u32,
1632            );
1633
1634            let state = PoolState {
1635                current_tick: row.get("current_tick"),
1636                price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1637                liquidity: row.get::<String, _>("liquidity").parse()?,
1638                protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1639                protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1640                fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1641                fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1642                fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1643            };
1644
1645            let analytics = PoolAnalytics {
1646                total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1647                total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1648                total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1649                total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1650                total_swaps: row.get::<i32, _>("total_swaps") as u64,
1651                total_mints: row.get::<i32, _>("total_mints") as u64,
1652                total_burns: row.get::<i32, _>("total_burns") as u64,
1653                total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1654                total_flashes: row.get::<i32, _>("total_flashes") as u64,
1655                liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1656            };
1657
1658            // Load positions and ticks
1659            let positions = self
1660                .load_pool_positions_for_snapshot(
1661                    chain_id,
1662                    pool_address,
1663                    block as u64,
1664                    transaction_index as u32,
1665                    log_index as u32,
1666                )
1667                .await?;
1668
1669            let ticks = self
1670                .load_pool_ticks_for_snapshot(
1671                    chain_id,
1672                    pool_address,
1673                    block as u64,
1674                    transaction_index as u32,
1675                    log_index as u32,
1676                )
1677                .await?;
1678
1679            let dex_name: String = row.get("dex_name");
1680            let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1681                .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1682
1683            let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1684                .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1685
1686            let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1687                .ok_or_else(|| {
1688                    anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1689                })?;
1690
1691            let instrument_id =
1692                Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1693
1694            Ok(Some(PoolSnapshot::new(
1695                instrument_id,
1696                state,
1697                positions,
1698                ticks,
1699                analytics,
1700                block_position,
1701            )))
1702        } else {
1703            Ok(None)
1704        }
1705    }
1706
1707    /// Marks a pool snapshot as valid after successful on-chain verification.
1708    ///
1709    /// # Errors
1710    ///
1711    /// Returns an error if the database operation fails.
1712    pub async fn mark_pool_snapshot_valid(
1713        &self,
1714        chain_id: u32,
1715        pool_address: &Address,
1716        block: u64,
1717        transaction_index: u32,
1718        log_index: u32,
1719    ) -> anyhow::Result<()> {
1720        sqlx::query(
1721            r"
1722            UPDATE pool_snapshot
1723            SET is_valid = TRUE
1724            WHERE chain_id = $1
1725            AND pool_address = $2
1726            AND block = $3
1727            AND transaction_index = $4
1728            AND log_index = $5
1729            ",
1730        )
1731        .bind(chain_id as i32)
1732        .bind(pool_address.to_string())
1733        .bind(block as i64)
1734        .bind(transaction_index as i32)
1735        .bind(log_index as i32)
1736        .execute(&self.pool)
1737        .await
1738        .map(|_| ())
1739        .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1740    }
1741
1742    /// Loads all positions for a specific snapshot.
1743    ///
1744    /// # Errors
1745    ///
1746    /// Returns an error if the database query fails.
1747    pub async fn load_pool_positions_for_snapshot(
1748        &self,
1749        chain_id: u32,
1750        pool_address: &Address,
1751        snapshot_block: u64,
1752        snapshot_transaction_index: u32,
1753        snapshot_log_index: u32,
1754    ) -> anyhow::Result<Vec<PoolPosition>> {
1755        let rows = sqlx::query(
1756            r"
1757            SELECT
1758                owner, tick_lower, tick_upper,
1759                liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1760                tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1761                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1762                total_amount0_collected::TEXT, total_amount1_collected::TEXT
1763            FROM pool_position
1764            WHERE chain_id = $1
1765            AND pool_address = $2
1766            AND snapshot_block = $3
1767            AND snapshot_transaction_index = $4
1768            AND snapshot_log_index = $5
1769            ",
1770        )
1771        .bind(chain_id as i32)
1772        .bind(pool_address.to_string())
1773        .bind(snapshot_block as i64)
1774        .bind(snapshot_transaction_index as i32)
1775        .bind(snapshot_log_index as i32)
1776        .fetch_all(&self.pool)
1777        .await
1778        .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1779
1780        rows.iter()
1781            .map(|row| {
1782                let owner: String = row.get("owner");
1783                let position = PoolPosition {
1784                    owner: validate_address(&owner)?,
1785                    tick_lower: row.get("tick_lower"),
1786                    tick_upper: row.get("tick_upper"),
1787                    liquidity: row.get::<String, _>("liquidity").parse()?,
1788                    fee_growth_inside_0_last: row
1789                        .get::<String, _>("fee_growth_inside_0_last")
1790                        .parse()?,
1791                    fee_growth_inside_1_last: row
1792                        .get::<String, _>("fee_growth_inside_1_last")
1793                        .parse()?,
1794                    tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1795                    tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1796                    total_amount0_deposited: row
1797                        .get::<String, _>("total_amount0_deposited")
1798                        .parse()?,
1799                    total_amount1_deposited: row
1800                        .get::<String, _>("total_amount1_deposited")
1801                        .parse()?,
1802                    total_amount0_collected: row
1803                        .get::<String, _>("total_amount0_collected")
1804                        .parse()?,
1805                    total_amount1_collected: row
1806                        .get::<String, _>("total_amount1_collected")
1807                        .parse()?,
1808                };
1809                Ok(position)
1810            })
1811            .collect()
1812    }
1813
1814    /// Loads all ticks for a specific snapshot.
1815    ///
1816    /// # Errors
1817    ///
1818    /// Returns an error if the database query fails.
1819    pub async fn load_pool_ticks_for_snapshot(
1820        &self,
1821        chain_id: u32,
1822        pool_address: &Address,
1823        snapshot_block: u64,
1824        snapshot_transaction_index: u32,
1825        snapshot_log_index: u32,
1826    ) -> anyhow::Result<Vec<PoolTick>> {
1827        let rows = sqlx::query(
1828            r"
1829            SELECT
1830                tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1831                fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1832                last_updated_block
1833            FROM pool_tick
1834            WHERE chain_id = $1
1835            AND pool_address = $2
1836            AND snapshot_block = $3
1837            AND snapshot_transaction_index = $4
1838            AND snapshot_log_index = $5
1839            ",
1840        )
1841        .bind(chain_id as i32)
1842        .bind(pool_address.to_string())
1843        .bind(snapshot_block as i64)
1844        .bind(snapshot_transaction_index as i32)
1845        .bind(snapshot_log_index as i32)
1846        .fetch_all(&self.pool)
1847        .await
1848        .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1849
1850        rows.iter()
1851            .map(|row| {
1852                let tick = PoolTick::new(
1853                    row.get("tick_value"),
1854                    row.get::<String, _>("liquidity_gross").parse()?,
1855                    row.get::<String, _>("liquidity_net").parse()?,
1856                    row.get::<String, _>("fee_growth_outside_0").parse()?,
1857                    row.get::<String, _>("fee_growth_outside_1").parse()?,
1858                    row.get("initialized"),
1859                    row.get::<i64, _>("last_updated_block") as u64,
1860                );
1861                Ok(tick)
1862            })
1863            .collect()
1864    }
1865
1866    /// Streams pool events from all event tables (swap, liquidity, collect) for a specific pool.
1867    ///
1868    /// Creates a unified stream of pool events from multiple tables, ordering them chronologically
1869    /// by block number, transaction index, and log index. Optionally resumes from a specific block position.
1870    ///
1871    /// # Returns
1872    ///
1873    /// A stream of `DexPoolData` events in chronological order.
1874    ///
1875    /// # Errors
1876    ///
1877    /// Returns an error if the database query fails or if event transformation fails.
1878    pub fn stream_pool_events<'a>(
1879        &'a self,
1880        chain: SharedChain,
1881        dex: SharedDex,
1882        instrument_id: InstrumentId,
1883        pool_address: &Address,
1884        from_position: Option<BlockPosition>,
1885    ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1886        // Query without position filter (streams all events)
1887        const QUERY_ALL: &str = r"
1888            (SELECT
1889                'swap' as event_type,
1890                chain_id,
1891                pool_address,
1892                block,
1893                transaction_hash,
1894                transaction_index,
1895                log_index,
1896                sender,
1897                recipient,
1898                NULL::TEXT as owner,
1899                side,
1900                size,
1901                price,
1902                sqrt_price_x96::TEXT,
1903                liquidity::TEXT as swap_liquidity,
1904                tick as swap_tick,
1905                amount0::TEXT as swap_amount0,
1906                amount1::TEXT as swap_amount1,
1907                NULL::TEXT as position_liquidity,
1908                NULL::TEXT as amount0,
1909                NULL::TEXT as amount1,
1910                NULL::INT as tick_lower,
1911                NULL::INT as tick_upper,
1912                NULL::TEXT as liquidity_event_type,
1913                NULL::TEXT as flash_amount0,
1914                NULL::TEXT as flash_amount1,
1915                NULL::TEXT as flash_paid0,
1916                NULL::TEXT as flash_paid1
1917            FROM pool_swap_event
1918            WHERE chain_id = $1 AND pool_address = $2)
1919            UNION ALL
1920            (SELECT
1921                'liquidity' as event_type,
1922                chain_id,
1923                pool_address,
1924                block,
1925                transaction_hash,
1926                transaction_index,
1927                log_index,
1928                sender,
1929                NULL::TEXT as recipient,
1930                owner,
1931                NULL::TEXT as side,
1932                NULL::TEXT as size,
1933                NULL::text as price,
1934                NULL::text as sqrt_price_x96,
1935                NULL::TEXT as swap_liquidity,
1936                NULL::INT as swap_tick,
1937                amount0::TEXT as swap_amount0,
1938                amount1::TEXT as swap_amount1,
1939                position_liquidity::TEXT,
1940                amount0::TEXT,
1941                amount1::TEXT,
1942                tick_lower::INT,
1943                tick_upper::INT,
1944                event_type as liquidity_event_type,
1945                NULL::TEXT as flash_amount0,
1946                NULL::TEXT as flash_amount1,
1947                NULL::TEXT as flash_paid0,
1948                NULL::TEXT as flash_paid1
1949            FROM pool_liquidity_event
1950            WHERE chain_id = $1 AND pool_address = $2)
1951            UNION ALL
1952            (SELECT
1953                'collect' as event_type,
1954                chain_id,
1955                pool_address,
1956                block,
1957                transaction_hash,
1958                transaction_index,
1959                log_index,
1960                NULL::TEXT as sender,
1961                NULL::TEXT as recipient,
1962                owner,
1963                NULL::TEXT as side,
1964                NULL::TEXT as size,
1965                NULL::TEXT as price,
1966                NULL::TEXT as sqrt_price_x96,
1967                NULL::TEXT as swap_liquidity,
1968                NULL::INT AS swap_tick,
1969                amount0::TEXT as swap_amount0,
1970                amount1::TEXT as swap_amount1,
1971                NULL::TEXT as position_liquidity,
1972                amount0::TEXT,
1973                amount1::TEXT,
1974                tick_lower::INT,
1975                tick_upper::INT,
1976                NULL::TEXT as liquidity_event_type,
1977                NULL::TEXT as flash_amount0,
1978                NULL::TEXT as flash_amount1,
1979                NULL::TEXT as flash_paid0,
1980                NULL::TEXT as flash_paid1
1981            FROM pool_collect_event
1982            WHERE chain_id = $1 AND pool_address = $2)
1983            UNION ALL
1984            (SELECT
1985                'flash' as event_type,
1986                chain_id,
1987                pool_address,
1988                block,
1989                transaction_hash,
1990                transaction_index,
1991                log_index,
1992                sender,
1993                recipient,
1994                NULL::TEXT as owner,
1995                NULL::TEXT as side,
1996                NULL::TEXT as size,
1997                NULL::TEXT as price,
1998                NULL::TEXT as sqrt_price_x96,
1999                NULL::TEXT as swap_liquidity,
2000                NULL::INT AS swap_tick,
2001                NULL::TEXT as swap_amount0,
2002                NULL::TEXT as swap_amount1,
2003                NULL::TEXT as position_liquidity,
2004                NULL::TEXT as amount0,
2005                NULL::TEXT as amount1,
2006                NULL::INT as tick_lower,
2007                NULL::INT as tick_upper,
2008                NULL::TEXT as liquidity_event_type,
2009                amount0::TEXT as flash_amount0,
2010                amount1::TEXT as flash_amount1,
2011                paid0::TEXT as flash_paid0,
2012                paid1::TEXT as flash_paid1
2013            FROM pool_flash_event
2014            WHERE chain_id = $1 AND pool_address = $2)
2015            ORDER BY block, transaction_index, log_index";
2016
2017        // Query with position filter (resumes from specific block position)
2018        const QUERY_FROM_POSITION: &str = r"
2019            (SELECT
2020                'swap' as event_type,
2021                chain_id,
2022                pool_address,
2023                block,
2024                transaction_hash,
2025                transaction_index,
2026                log_index,
2027                sender,
2028                recipient,
2029                NULL::TEXT as owner,
2030                side,
2031                size,
2032                price,
2033                sqrt_price_x96::TEXT,
2034                liquidity::TEXT as swap_liquidity,
2035                tick as swap_tick,
2036                amount0::TEXT as swap_amount0,
2037                amount1::TEXT as swap_amount1,
2038                NULL::TEXT as position_liquidity,
2039                NULL::TEXT as amount0,
2040                NULL::TEXT as amount1,
2041                NULL::INT as tick_lower,
2042                NULL::INT as tick_upper,
2043                NULL::TEXT as liquidity_event_type,
2044                NULL::TEXT as flash_amount0,
2045                NULL::TEXT as flash_amount1,
2046                NULL::TEXT as flash_paid0,
2047                NULL::TEXT as flash_paid1
2048            FROM pool_swap_event
2049            WHERE chain_id = $1 AND pool_address = $2
2050            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2051            UNION ALL
2052            (SELECT
2053                'liquidity' as event_type,
2054                chain_id,
2055                pool_address,
2056                block,
2057                transaction_hash,
2058                transaction_index,
2059                log_index,
2060                sender,
2061                NULL::TEXT as recipient,
2062                owner,
2063                NULL::TEXT as side,
2064                NULL::TEXT as size,
2065                NULL::text as price,
2066                NULL::text as sqrt_price_x96,
2067                NULL::TEXT as swap_liquidity,
2068                NULL::INT as swap_tick,
2069                amount0::TEXT as swap_amount0,
2070                amount1::TEXT as swap_amount1,
2071                position_liquidity::TEXT,
2072                amount0::TEXT,
2073                amount1::TEXT,
2074                tick_lower::INT,
2075                tick_upper::INT,
2076                event_type as liquidity_event_type,
2077                NULL::TEXT as flash_amount0,
2078                NULL::TEXT as flash_amount1,
2079                NULL::TEXT as flash_paid0,
2080                NULL::TEXT as flash_paid1
2081            FROM pool_liquidity_event
2082            WHERE chain_id = $1 AND pool_address = $2
2083            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2084            UNION ALL
2085            (SELECT
2086                'collect' as event_type,
2087                chain_id,
2088                pool_address,
2089                block,
2090                transaction_hash,
2091                transaction_index,
2092                log_index,
2093                NULL::TEXT as sender,
2094                NULL::TEXT as recipient,
2095                owner,
2096                NULL::TEXT as side,
2097                NULL::TEXT as size,
2098                NULL::TEXT as price,
2099                NULL::TEXT as sqrt_price_x96,
2100                NULL::TEXT as swap_liquidity,
2101                NULL::INT AS swap_tick,
2102                amount0::TEXT as swap_amount0,
2103                amount1::TEXT as swap_amount1,
2104                NULL::TEXT as position_liquidity,
2105                amount0::TEXT,
2106                amount1::TEXT,
2107                tick_lower::INT,
2108                tick_upper::INT,
2109                NULL::TEXT as liquidity_event_type,
2110                NULL::TEXT as flash_amount0,
2111                NULL::TEXT as flash_amount1,
2112                NULL::TEXT as flash_paid0,
2113                NULL::TEXT as flash_paid1
2114            FROM pool_collect_event
2115            WHERE chain_id = $1 AND pool_address = $2
2116            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2117            UNION ALL
2118            (SELECT
2119                'flash' as event_type,
2120                chain_id,
2121                pool_address,
2122                block,
2123                transaction_hash,
2124                transaction_index,
2125                log_index,
2126                sender,
2127                recipient,
2128                NULL::TEXT as owner,
2129                NULL::TEXT as side,
2130                NULL::TEXT as size,
2131                NULL::TEXT as price,
2132                NULL::TEXT as sqrt_price_x96,
2133                NULL::TEXT as swap_liquidity,
2134                NULL::INT AS swap_tick,
2135                NULL::TEXT as swap_amount0,
2136                NULL::TEXT as swap_amount1,
2137                NULL::TEXT as position_liquidity,
2138                NULL::TEXT as amount0,
2139                NULL::TEXT as amount1,
2140                NULL::INT as tick_lower,
2141                NULL::INT as tick_upper,
2142                NULL::TEXT as liquidity_event_type,
2143                amount0::TEXT as flash_amount0,
2144                amount1::TEXT as flash_amount1,
2145                paid0::TEXT as flash_paid0,
2146                paid1::TEXT as flash_paid1
2147            FROM pool_flash_event
2148            WHERE chain_id = $1 AND pool_address = $2
2149            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2150            ORDER BY block, transaction_index, log_index";
2151
2152        // Build query with appropriate bindings
2153        let query = if let Some(pos) = from_position {
2154            sqlx::query(QUERY_FROM_POSITION)
2155                .bind(chain.chain_id as i32)
2156                .bind(pool_address.to_string())
2157                .bind(pos.number as i64)
2158                .bind(pos.transaction_index as i32)
2159                .bind(pos.log_index as i32)
2160                .fetch(&self.pool)
2161        } else {
2162            sqlx::query(QUERY_ALL)
2163                .bind(chain.chain_id as i32)
2164                .bind(pool_address.to_string())
2165                .fetch(&self.pool)
2166        };
2167
2168        // Transform rows to events
2169        let stream = query.map(move |row_result| match row_result {
2170            Ok(row) => {
2171                transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2172                    .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2173            }
2174            Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2175        });
2176
2177        Box::pin(stream)
2178    }
2179}