nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21    defi::{
22        Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24        pool_analysis::{
25            position::PoolPosition,
26            snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27        },
28        tick_map::tick::PoolTick,
29        validation::validate_address,
30    },
31    identifiers::InstrumentId,
32};
33use sqlx::{PgPool, Row, postgres::PgConnectOptions};
34
35use crate::{
36    cache::{
37        consistency::CachedBlocksConsistencyStatus,
38        copy::PostgresCopyHandler,
39        rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
40        types::{U128Pg, U256Pg},
41    },
42    events::initialize::InitializeEvent,
43};
44
45/// Database interface for persisting and retrieving blockchain entities and domain objects.
46#[derive(Debug)]
47pub struct BlockchainCacheDatabase {
48    /// PostgreSQL connection pool used for database operations.
49    pool: PgPool,
50}
51
52impl BlockchainCacheDatabase {
53    /// Initializes a new database instance by establishing a connection to PostgreSQL.
54    ///
55    /// # Panics
56    ///
57    /// Panics if unable to connect to PostgreSQL with the provided options.
58    pub async fn init(pg_options: PgConnectOptions) -> Self {
59        let pool = sqlx::postgres::PgPoolOptions::new()
60            .max_connections(32) // Increased from default 10
61            .min_connections(5) // Keep some connections warm
62            .acquire_timeout(std::time::Duration::from_secs(3))
63            .connect_with(pg_options)
64            .await
65            .expect("Error connecting to Postgres");
66        Self { pool }
67    }
68
69    /// Seeds the database with a blockchain chain record.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the database operation fails.
74    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
75        sqlx::query(
76            r"
77            INSERT INTO chain (
78                chain_id, name
79            ) VALUES ($1,$2)
80            ON CONFLICT (chain_id)
81            DO NOTHING
82        ",
83        )
84        .bind(chain.chain_id as i32)
85        .bind(chain.name.to_string())
86        .execute(&self.pool)
87        .await
88        .map(|_| ())
89        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
90    }
91
92    /// Creates a table partition for the block table specific to the given chain
93    /// by calling the existing PostgreSQL function `create_block_partition`.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the database operation fails.
98    pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
99        let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
100            .bind(chain.chain_id as i32)
101            .fetch_one(&self.pool)
102            .await
103            .map_err(|e| {
104                anyhow::anyhow!(
105                    "Failed to call create_block_partition for chain {}: {e}",
106                    chain.chain_id
107                )
108            })?;
109
110        Ok(result.0)
111    }
112
113    /// Creates a table partition for the token table specific to the given chain
114    /// by calling the existing PostgreSQL function `create_token_partition`.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the database operation fails.
119    pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
120        let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
121            .bind(chain.chain_id as i32)
122            .fetch_one(&self.pool)
123            .await
124            .map_err(|e| {
125                anyhow::anyhow!(
126                    "Failed to call create_token_partition for chain {}: {e}",
127                    chain.chain_id
128                )
129            })?;
130
131        Ok(result.0)
132    }
133
134    /// Returns the highest block number that maintains data continuity in the database.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the database query fails.
139    pub async fn get_block_consistency_status(
140        &self,
141        chain: &Chain,
142    ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
143        tracing::info!("Fetching block consistency status");
144
145        let result: (i64, i64) = sqlx::query_as(
146            r"
147            SELECT
148                COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
149                get_last_continuous_block($1) as last_continuous_block
150            "
151        )
152        .bind(chain.chain_id as i32)
153        .fetch_one(&self.pool)
154        .await
155        .map_err(|e| {
156            anyhow::anyhow!(
157                "Failed to get block info for chain {}: {}",
158                chain.chain_id,
159                e
160            )
161        })?;
162
163        Ok(CachedBlocksConsistencyStatus::new(
164            result.0 as u64,
165            result.1 as u64,
166        ))
167    }
168
169    /// Inserts or updates a block record in the database.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the database operation fails.
174    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
175        sqlx::query(
176            r"
177            INSERT INTO block (
178                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
179                base_fee_per_gas, blob_gas_used, excess_blob_gas,
180                l1_gas_price, l1_gas_used, l1_fee_scalar
181            ) VALUES (
182                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
183            )
184            ON CONFLICT (chain_id, number)
185            DO UPDATE
186            SET
187                hash = $3,
188                parent_hash = $4,
189                miner = $5,
190                gas_limit = $6,
191                gas_used = $7,
192                timestamp = $8,
193                base_fee_per_gas = $9,
194                blob_gas_used = $10,
195                excess_blob_gas = $11,
196                l1_gas_price = $12,
197                l1_gas_used = $13,
198                l1_fee_scalar = $14
199        ",
200        )
201        .bind(chain_id as i32)
202        .bind(block.number as i64)
203        .bind(block.hash.as_str())
204        .bind(block.parent_hash.as_str())
205        .bind(block.miner.as_str())
206        .bind(block.gas_limit as i64)
207        .bind(block.gas_used as i64)
208        .bind(block.timestamp.to_string())
209        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
210        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
211        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
212        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
213        .bind(block.l1_gas_used.map(|v| v as i64))
214        .bind(block.l1_fee_scalar.map(|v| v as i64))
215        .execute(&self.pool)
216        .await
217        .map(|_| ())
218        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
219    }
220
221    /// Inserts multiple blocks in a single database operation using UNNEST for optimal performance.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the database operation fails.
226    pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
227        if blocks.is_empty() {
228            return Ok(());
229        }
230
231        // Prepare vectors for each column
232        let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
233        let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
234        let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
235        let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
236        let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
237        let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
238        let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
239        let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
240        let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241        let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242        let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243        let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
244        let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245
246        // Fill vectors from blocks
247        for block in blocks {
248            numbers.push(block.number as i64);
249            hashes.push(block.hash.clone());
250            parent_hashes.push(block.parent_hash.clone());
251            miners.push(block.miner.to_string());
252            gas_limits.push(block.gas_limit as i64);
253            gas_useds.push(block.gas_used as i64);
254            timestamps.push(block.timestamp.to_string());
255            base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
256            blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
257            excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
258            l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
259            l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
260            l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
261        }
262
263        // Execute batch insert with UNNEST
264        sqlx::query(
265            r"
266            INSERT INTO block (
267                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
268                base_fee_per_gas, blob_gas_used, excess_blob_gas,
269                l1_gas_price, l1_gas_used, l1_fee_scalar
270            )
271            SELECT
272                $1, *
273            FROM UNNEST(
274                $2::int8[], $3::text[], $4::text[], $5::text[],
275                $6::int8[], $7::int8[], $8::text[],
276                $9::text[], $10::text[], $11::text[],
277                $12::text[], $13::int8[], $14::int8[]
278            )
279            ON CONFLICT (chain_id, number) DO NOTHING
280           ",
281        )
282        .bind(chain_id as i32)
283        .bind(&numbers[..])
284        .bind(&hashes[..])
285        .bind(&parent_hashes[..])
286        .bind(&miners[..])
287        .bind(&gas_limits[..])
288        .bind(&gas_useds[..])
289        .bind(&timestamps[..])
290        .bind(&base_fee_per_gases as &[Option<String>])
291        .bind(&blob_gas_useds as &[Option<String>])
292        .bind(&excess_blob_gases as &[Option<String>])
293        .bind(&l1_gas_prices as &[Option<String>])
294        .bind(&l1_gas_useds as &[Option<i64>])
295        .bind(&l1_fee_scalars as &[Option<i64>])
296        .execute(&self.pool)
297        .await
298        .map(|_| ())
299        .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
300    }
301
302    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
303    ///
304    /// This method is significantly faster than INSERT for bulk operations as it bypasses
305    /// SQL parsing and uses PostgreSQL's native binary protocol.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the COPY operation fails.
310    pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
311        let copy_handler = PostgresCopyHandler::new(&self.pool);
312        copy_handler.copy_blocks(chain_id, blocks).await
313    }
314
315    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
316    ///
317    /// This method is significantly faster than INSERT for bulk operations as it bypasses
318    /// SQL parsing and uses PostgreSQL's native binary protocol.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the COPY operation fails.
323    pub async fn add_pool_swaps_copy(
324        &self,
325        chain_id: u32,
326        swaps: &[PoolSwap],
327    ) -> anyhow::Result<()> {
328        let copy_handler = PostgresCopyHandler::new(&self.pool);
329        copy_handler.copy_pool_swaps(chain_id, swaps).await
330    }
331
332    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
333    ///
334    /// This method is significantly faster than INSERT for bulk operations as it bypasses
335    /// SQL parsing and uses PostgreSQL's native binary protocol.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if the COPY operation fails.
340    pub async fn add_pool_liquidity_updates_copy(
341        &self,
342        chain_id: u32,
343        updates: &[PoolLiquidityUpdate],
344    ) -> anyhow::Result<()> {
345        let copy_handler = PostgresCopyHandler::new(&self.pool);
346        copy_handler
347            .copy_pool_liquidity_updates(chain_id, updates)
348            .await
349    }
350
351    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
352    ///
353    /// This method is significantly faster than INSERT for bulk operations as it bypasses
354    /// SQL parsing and most database validation checks.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the COPY operation fails.
359    pub async fn copy_pool_fee_collects_batch(
360        &self,
361        chain_id: u32,
362        collects: &[PoolFeeCollect],
363    ) -> anyhow::Result<()> {
364        let copy_handler = PostgresCopyHandler::new(&self.pool);
365        copy_handler.copy_pool_collects(chain_id, collects).await
366    }
367
368    /// Retrieves block timestamps for a given chain starting from a specific block number.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the database query fails.
373    pub async fn load_block_timestamps(
374        &self,
375        chain: SharedChain,
376        from_block: u64,
377    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
378        sqlx::query_as::<_, BlockTimestampRow>(
379            r"
380            SELECT
381                number,
382                timestamp
383            FROM block
384            WHERE chain_id = $1 AND number >= $2
385            ORDER BY number ASC
386            ",
387        )
388        .bind(chain.chain_id as i32)
389        .bind(from_block as i64)
390        .fetch_all(&self.pool)
391        .await
392        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
393    }
394
395    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the database operation fails.
400    pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
401        sqlx::query(
402            r"
403            INSERT INTO dex (
404                chain_id, name, factory_address, creation_block
405            ) VALUES ($1, $2, $3, $4)
406            ON CONFLICT (chain_id, name)
407            DO UPDATE
408            SET
409                factory_address = $3,
410                creation_block = $4
411        ",
412        )
413        .bind(dex.chain.chain_id as i32)
414        .bind(dex.name.to_string())
415        .bind(dex.factory.to_string())
416        .bind(dex.factory_creation_block as i64)
417        .execute(&self.pool)
418        .await
419        .map(|_| ())
420        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
421    }
422
423    /// Adds or updates a liquidity pool/pair record in the database.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the database operation fails.
428    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
429        sqlx::query(
430            r"
431            INSERT INTO pool (
432                chain_id, address, dex_name, creation_block,
433                token0_chain, token0_address,
434                token1_chain, token1_address,
435                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
436            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
437            ON CONFLICT (chain_id, address)
438            DO UPDATE
439            SET
440                dex_name = $3,
441                creation_block = $4,
442                token0_chain = $5,
443                token0_address = $6,
444                token1_chain = $7,
445                token1_address = $8,
446                fee = $9,
447                tick_spacing = $10,
448                initial_tick = $11,
449                initial_sqrt_price_x96 = $12
450        ",
451        )
452        .bind(pool.chain.chain_id as i32)
453        .bind(pool.address.to_string())
454        .bind(pool.dex.name.to_string())
455        .bind(pool.creation_block as i64)
456        .bind(pool.token0.chain.chain_id as i32)
457        .bind(pool.token0.address.to_string())
458        .bind(pool.token1.chain.chain_id as i32)
459        .bind(pool.token1.address.to_string())
460        .bind(pool.fee.map(|fee| fee as i32))
461        .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
462        .bind(pool.initial_tick)
463        .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
464        .execute(&self.pool)
465        .await
466        .map(|_| ())
467        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
468    }
469
470    /// Inserts multiple pools in a single database operation using UNNEST for optimal performance.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the database operation fails.
475    pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
476        if pools.is_empty() {
477            return Ok(());
478        }
479
480        // Prepare vectors for each column
481        let len = pools.len();
482        let mut addresses: Vec<String> = Vec::with_capacity(len);
483        let mut dex_names: Vec<String> = Vec::with_capacity(len);
484        let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
485        let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
486        let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
487        let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
488        let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
489        let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
490        let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
491        let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
492        let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
493        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
494
495        // Fill vectors from pools
496        for pool in pools {
497            chain_ids.push(pool.chain.chain_id as i32);
498            addresses.push(pool.address.to_string());
499            dex_names.push(pool.dex.name.to_string());
500            creation_blocks.push(pool.creation_block as i64);
501            token0_chains.push(pool.token0.chain.chain_id as i32);
502            token0_addresses.push(pool.token0.address.to_string());
503            token1_chains.push(pool.token1.chain.chain_id as i32);
504            token1_addresses.push(pool.token1.address.to_string());
505            fees.push(pool.fee.map(|fee| fee as i32));
506            tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
507            initial_ticks.push(pool.initial_tick);
508            initial_sqrt_price_x96s
509                .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
510        }
511
512        // Execute batch insert with UNNEST
513        sqlx::query(
514            r"
515            INSERT INTO pool (
516                chain_id, address, dex_name, creation_block,
517                token0_chain, token0_address,
518                token1_chain, token1_address,
519                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
520            )
521            SELECT *
522            FROM UNNEST(
523                $1::int4[], $2::text[], $3::text[], $4::int8[],
524                $5::int4[], $6::text[], $7::int4[], $8::text[],
525                $9::int4[], $10::int4[], $11::int4[], $12::text[]
526            )
527            ON CONFLICT (chain_id, address) DO NOTHING
528           ",
529        )
530        .bind(&chain_ids[..])
531        .bind(&addresses[..])
532        .bind(&dex_names[..])
533        .bind(&creation_blocks[..])
534        .bind(&token0_chains[..])
535        .bind(&token0_addresses[..])
536        .bind(&token1_chains[..])
537        .bind(&token1_addresses[..])
538        .bind(&fees[..])
539        .bind(&tick_spacings[..])
540        .bind(&initial_ticks[..])
541        .bind(&initial_sqrt_price_x96s[..])
542        .execute(&self.pool)
543        .await
544        .map(|_| ())
545        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
546    }
547
548    /// Inserts multiple pool swaps in a single database operation using UNNEST for optimal performance.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the database operation fails.
553    pub async fn add_pool_swaps_batch(
554        &self,
555        chain_id: u32,
556        swaps: &[PoolSwap],
557    ) -> anyhow::Result<()> {
558        if swaps.is_empty() {
559            return Ok(());
560        }
561
562        // Prepare vectors for each column
563        let len = swaps.len();
564        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
565        let mut blocks: Vec<i64> = Vec::with_capacity(len);
566        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
567        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
568        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
569        let mut senders: Vec<String> = Vec::with_capacity(len);
570        let mut recipients: Vec<String> = Vec::with_capacity(len);
571        let mut sides: Vec<Option<String>> = Vec::with_capacity(len);
572        let mut sizes: Vec<Option<String>> = Vec::with_capacity(len);
573        let mut prices: Vec<Option<String>> = Vec::with_capacity(len);
574        let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
575        let mut liquidities: Vec<String> = Vec::with_capacity(len);
576        let mut ticks: Vec<i32> = Vec::with_capacity(len);
577        let mut amount0s: Vec<String> = Vec::with_capacity(len);
578        let mut amount1s: Vec<String> = Vec::with_capacity(len);
579        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
580
581        // Fill vectors from swaps
582        for swap in swaps {
583            chain_ids.push(chain_id as i32);
584            pool_addresses.push(swap.pool_address.to_string());
585            blocks.push(swap.block as i64);
586            transaction_hashes.push(swap.transaction_hash.clone());
587            transaction_indices.push(swap.transaction_index as i32);
588            log_indices.push(swap.log_index as i32);
589            senders.push(swap.sender.to_string());
590            recipients.push(swap.recipient.to_string());
591            sides.push(swap.side.map(|side| side.to_string()));
592            sizes.push(swap.size.map(|size| size.to_string()));
593            prices.push(swap.price.map(|price| price.to_string()));
594            sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
595            liquidities.push(swap.liquidity.to_string());
596            ticks.push(swap.tick);
597            amount0s.push(swap.amount0.to_string());
598            amount1s.push(swap.amount1.to_string());
599        }
600
601        // Execute batch insert with UNNEST
602        sqlx::query(
603            r"
604            INSERT INTO pool_swap_event (
605                chain_id, pool_address, block, transaction_hash, transaction_index,
606                log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1
607            )
608            SELECT
609                chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
610                side, size, price, sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256
611            FROM UNNEST(
612                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[], $6::INT[],
613                $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[],
614                $12::TEXT[], $13::TEXT[], $14::INT[], $15::TEXT[], $16::TEXT[]
615            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
616                   log_index, sender, recipient, side, size, price, sqrt_price_x96, liquidity, tick, amount0, amount1)
617            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
618           ",
619        )
620        .bind(&chain_ids[..])
621        .bind(&pool_addresses[..])
622        .bind(&blocks[..])
623        .bind(&transaction_hashes[..])
624        .bind(&transaction_indices[..])
625        .bind(&log_indices[..])
626        .bind(&senders[..])
627        .bind(&recipients[..])
628        .bind(&sides[..])
629        .bind(&sizes[..])
630        .bind(&prices[..])
631        .bind(&sqrt_price_x96s[..])
632        .bind(&liquidities[..])
633        .bind(&ticks[..])
634        .bind(&amount0s[..])
635        .bind(&amount1s[..])
636        .execute(&self.pool)
637        .await
638        .map(|_| ())
639        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
640    }
641
642    /// Inserts multiple pool liquidity updates in a single database operation using UNNEST for optimal performance.
643    ///
644    /// # Errors
645    ///
646    /// Returns an error if the database operation fails.
647    pub async fn add_pool_liquidity_updates_batch(
648        &self,
649        chain_id: u32,
650        updates: &[PoolLiquidityUpdate],
651    ) -> anyhow::Result<()> {
652        if updates.is_empty() {
653            return Ok(());
654        }
655
656        // Prepare vectors for each column
657        let len = updates.len();
658        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
659        let mut blocks: Vec<i64> = Vec::with_capacity(len);
660        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
661        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
662        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
663        let mut event_types: Vec<String> = Vec::with_capacity(len);
664        let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
665        let mut owners: Vec<String> = Vec::with_capacity(len);
666        let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
667        let mut amount0s: Vec<String> = Vec::with_capacity(len);
668        let mut amount1s: Vec<String> = Vec::with_capacity(len);
669        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
670        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
671        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
672
673        // Fill vectors from updates
674        for update in updates {
675            chain_ids.push(chain_id as i32);
676            pool_addresses.push(update.pool_address.to_string());
677            blocks.push(update.block as i64);
678            transaction_hashes.push(update.transaction_hash.clone());
679            transaction_indices.push(update.transaction_index as i32);
680            log_indices.push(update.log_index as i32);
681            event_types.push(update.kind.to_string());
682            senders.push(update.sender.map(|s| s.to_string()));
683            owners.push(update.owner.to_string());
684            position_liquidities.push(update.position_liquidity.to_string());
685            amount0s.push(update.amount0.to_string());
686            amount1s.push(update.amount1.to_string());
687            tick_lowers.push(update.tick_lower);
688            tick_uppers.push(update.tick_upper);
689        }
690
691        // Execute batch insert with UNNEST
692        sqlx::query(
693            r"
694            INSERT INTO pool_liquidity_event (
695                chain_id, pool_address, block, transaction_hash, transaction_index,
696                log_index, event_type, sender, owner, position_liquidity,
697                amount0, amount1, tick_lower, tick_upper
698            )
699            SELECT
700                chain_id, pool_address, block, transaction_hash, transaction_index,
701                log_index, event_type, sender, owner, position_liquidity::u128,
702                amount0::U256, amount1::U256, tick_lower, tick_upper
703            FROM UNNEST(
704                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
705                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
706                $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
707            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
708                   log_index, event_type, sender, owner, position_liquidity,
709                   amount0, amount1, tick_lower, tick_upper)
710            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
711           ",
712        )
713        .bind(&chain_ids[..])
714        .bind(&pool_addresses[..])
715        .bind(&blocks[..])
716        .bind(&transaction_hashes[..])
717        .bind(&transaction_indices[..])
718        .bind(&log_indices[..])
719        .bind(&event_types[..])
720        .bind(&senders[..])
721        .bind(&owners[..])
722        .bind(&position_liquidities[..])
723        .bind(&amount0s[..])
724        .bind(&amount1s[..])
725        .bind(&tick_lowers[..])
726        .bind(&tick_uppers[..])
727        .execute(&self.pool)
728        .await
729        .map(|_| ())
730        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
731    }
732
733    /// Adds or updates a token record in the database.
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if the database operation fails.
738    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
739        sqlx::query(
740            r"
741            INSERT INTO token (
742                chain_id, address, name, symbol, decimals
743            ) VALUES ($1, $2, $3, $4, $5)
744            ON CONFLICT (chain_id, address)
745            DO UPDATE
746            SET
747                name = $3,
748                symbol = $4,
749                decimals = $5
750        ",
751        )
752        .bind(token.chain.chain_id as i32)
753        .bind(token.address.to_string())
754        .bind(token.name.as_str())
755        .bind(token.symbol.as_str())
756        .bind(i32::from(token.decimals))
757        .execute(&self.pool)
758        .await
759        .map(|_| ())
760        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
761    }
762
763    /// Records an invalid token address with associated error information.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the database insertion fails.
768    pub async fn add_invalid_token(
769        &self,
770        chain_id: u32,
771        address: &Address,
772        error_string: &str,
773    ) -> anyhow::Result<()> {
774        sqlx::query(
775            r"
776            INSERT INTO token (
777                chain_id, address, error
778            ) VALUES ($1, $2, $3)
779            ON CONFLICT (chain_id, address)
780            DO NOTHING;
781        ",
782        )
783        .bind(chain_id as i32)
784        .bind(address.to_string())
785        .bind(error_string)
786        .execute(&self.pool)
787        .await
788        .map(|_| ())
789        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
790    }
791
792    /// Persists a token swap transaction event to the `pool_swap` table.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the database operation fails.
797    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
798        sqlx::query(
799            r"
800            INSERT INTO pool_swap_event (
801                chain_id, pool_address, block, transaction_hash, transaction_index,
802                log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
803            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
804            ON CONFLICT (chain_id, transaction_hash, log_index)
805            DO NOTHING
806        ",
807        )
808        .bind(chain_id as i32)
809        .bind(swap.pool_address.to_string())
810        .bind(swap.block as i64)
811        .bind(swap.transaction_hash.as_str())
812        .bind(swap.transaction_index as i32)
813        .bind(swap.log_index as i32)
814        .bind(swap.sender.to_string())
815        .bind(swap.recipient.to_string())
816        .bind(swap.side.map(|side| side.to_string()))
817        .bind(swap.size.map(|size| size.to_string()))
818        .bind(swap.price.map(|price| price.to_string()))
819        .bind(swap.sqrt_price_x96.to_string())
820        .bind(swap.amount0.to_string())
821        .bind(swap.amount1.to_string())
822        .execute(&self.pool)
823        .await
824        .map(|_| ())
825        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
826    }
827
828    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if the database operation fails.
833    pub async fn add_pool_liquidity_update(
834        &self,
835        chain_id: u32,
836        liquidity_update: &PoolLiquidityUpdate,
837    ) -> anyhow::Result<()> {
838        sqlx::query(
839            r"
840            INSERT INTO pool_liquidity_event (
841                chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
842                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
843            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
844            ON CONFLICT (chain_id, transaction_hash, log_index)
845            DO NOTHING
846        ",
847        )
848        .bind(chain_id as i32)
849        .bind(liquidity_update.pool_address.to_string())
850        .bind(liquidity_update.block as i64)
851        .bind(liquidity_update.transaction_hash.as_str())
852        .bind(liquidity_update.transaction_index as i32)
853        .bind(liquidity_update.log_index as i32)
854        .bind(liquidity_update.kind.to_string())
855        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
856        .bind(liquidity_update.owner.to_string())
857        .bind(U128Pg(liquidity_update.position_liquidity))
858        .bind(U256Pg(liquidity_update.amount0))
859        .bind(U256Pg(liquidity_update.amount1))
860        .bind(liquidity_update.tick_lower)
861        .bind(liquidity_update.tick_upper)
862        .execute(&self.pool)
863        .await
864        .map(|_| ())
865        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
866    }
867
868    /// Retrieves all valid token records for the given chain and converts them into `Token` domain objects.
869    ///
870    /// Only returns tokens that do not contain error information, filtering out invalid tokens
871    /// that were previously recorded with error details.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the database query fails.
876    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
877        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
878            .bind(chain.chain_id as i32)
879            .fetch_all(&self.pool)
880            .await
881            .map(|rows| {
882                rows.into_iter()
883                    .map(|token_row| {
884                        Token::new(
885                            chain.clone(),
886                            token_row.address,
887                            token_row.name,
888                            token_row.symbol,
889                            token_row.decimals as u8,
890                        )
891                    })
892                    .collect::<Vec<_>>()
893            })
894            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
895    }
896
897    /// Retrieves all invalid token addresses for a given chain.
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if the database query fails or address validation fails.
902    pub async fn load_invalid_token_addresses(
903        &self,
904        chain_id: u32,
905    ) -> anyhow::Result<Vec<Address>> {
906        sqlx::query_as::<_, (String,)>(
907            "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
908        )
909        .bind(chain_id as i32)
910        .fetch_all(&self.pool)
911        .await?
912        .into_iter()
913        .map(|(address,)| validate_address(&address))
914        .collect::<Result<Vec<_>, _>>()
915        .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
916    }
917
918    /// Loads pool data from the database for the specified chain and DEX.
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if the database query fails, the connection to the database is lost, or the query parameters are invalid.
923    pub async fn load_pools(
924        &self,
925        chain: SharedChain,
926        dex_id: &str,
927    ) -> anyhow::Result<Vec<PoolRow>> {
928        sqlx::query_as::<_, PoolRow>(
929            r"
930            SELECT
931                address,
932                dex_name,
933                creation_block,
934                token0_chain,
935                token0_address,
936                token1_chain,
937                token1_address,
938                fee,
939                tick_spacing,
940                initial_tick,
941                initial_sqrt_price_x96
942            FROM pool
943            WHERE chain_id = $1 AND dex_name = $2
944            ORDER BY creation_block ASC
945        ",
946        )
947        .bind(chain.chain_id as i32)
948        .bind(dex_id)
949        .fetch_all(&self.pool)
950        .await
951        .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
952    }
953
954    /// Toggles performance optimization settings for sync operations.
955    ///
956    /// When enabled (true), applies settings for maximum write performance:
957    /// - `synchronous_commit` = OFF
958    /// - `work_mem` increased for bulk operations
959    ///
960    /// When disabled (false), restores default safe settings:
961    /// - `synchronous_commit` = ON (data safety)
962    /// - `work_mem` back to default
963    ///
964    /// # Errors
965    ///
966    /// Returns an error if the database operations fail.
967    pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
968        if enable {
969            tracing::info!("Enabling performance sync settings for bulk operations");
970
971            // Set synchronous_commit to OFF for maximum write performance
972            sqlx::query("SET synchronous_commit = OFF")
973                .execute(&self.pool)
974                .await
975                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
976
977            // Increase work_mem for bulk operations
978            sqlx::query("SET work_mem = '256MB'")
979                .execute(&self.pool)
980                .await
981                .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
982
983            tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
984        } else {
985            tracing::info!("Restoring default safe database performance settings");
986
987            // Restore synchronous_commit to ON for data safety
988            sqlx::query("SET synchronous_commit = ON")
989                .execute(&self.pool)
990                .await
991                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
992
993            // Reset work_mem to default
994            sqlx::query("RESET work_mem")
995                .execute(&self.pool)
996                .await
997                .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
998        }
999
1000        Ok(())
1001    }
1002
1003    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
1004    ///
1005    /// # Errors
1006    ///
1007    /// Returns an error if the database operation fails.
1008    pub async fn update_dex_last_synced_block(
1009        &self,
1010        chain_id: u32,
1011        dex: &DexType,
1012        block_number: u64,
1013    ) -> anyhow::Result<()> {
1014        sqlx::query(
1015            r"
1016            UPDATE dex
1017            SET last_full_sync_pools_block_number = $3
1018            WHERE chain_id = $1 AND name = $2
1019            ",
1020        )
1021        .bind(chain_id as i32)
1022        .bind(dex.to_string())
1023        .bind(block_number as i64)
1024        .execute(&self.pool)
1025        .await
1026        .map(|_| ())
1027        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1028    }
1029
1030    pub async fn update_pool_last_synced_block(
1031        &self,
1032        chain_id: u32,
1033        dex: &DexType,
1034        pool_address: &Address,
1035        block_number: u64,
1036    ) -> anyhow::Result<()> {
1037        sqlx::query(
1038            r"
1039            UPDATE pool
1040            SET last_full_sync_block_number = $4
1041            WHERE chain_id = $1
1042            AND dex_name = $2
1043            AND address = $3
1044            ",
1045        )
1046        .bind(chain_id as i32)
1047        .bind(dex.to_string())
1048        .bind(pool_address.to_string())
1049        .bind(block_number as i64)
1050        .execute(&self.pool)
1051        .await
1052        .map(|_| ())
1053        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1054    }
1055
1056    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if the database query fails.
1061    pub async fn get_dex_last_synced_block(
1062        &self,
1063        chain_id: u32,
1064        dex: &DexType,
1065    ) -> anyhow::Result<Option<u64>> {
1066        let result = sqlx::query_as::<_, (Option<i64>,)>(
1067            r#"
1068            SELECT
1069                last_full_sync_pools_block_number
1070            FROM dex
1071            WHERE chain_id = $1
1072            AND name = $2
1073            "#,
1074        )
1075        .bind(chain_id as i32)
1076        .bind(dex.to_string())
1077        .fetch_optional(&self.pool)
1078        .await
1079        .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1080
1081        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1082    }
1083
1084    pub async fn get_pool_last_synced_block(
1085        &self,
1086        chain_id: u32,
1087        dex: &DexType,
1088        pool_address: &Address,
1089    ) -> anyhow::Result<Option<u64>> {
1090        let result = sqlx::query_as::<_, (Option<i64>,)>(
1091            r#"
1092            SELECT
1093                last_full_sync_block_number
1094            FROM pool
1095            WHERE chain_id = $1
1096            AND dex_name = $2
1097            AND address = $3
1098            "#,
1099        )
1100        .bind(chain_id as i32)
1101        .bind(dex.to_string())
1102        .bind(pool_address.to_string())
1103        .fetch_optional(&self.pool)
1104        .await
1105        .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1106
1107        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1108    }
1109
1110    /// Retrieves the maximum block number from a specific table for a given pool.
1111    /// This is useful to detect orphaned data where events were inserted but progress wasn't updated.
1112    ///
1113    /// # Errors
1114    ///
1115    /// Returns an error if the database query fails.
1116    pub async fn get_table_last_block(
1117        &self,
1118        chain_id: u32,
1119        table_name: &str,
1120        pool_address: &Address,
1121    ) -> anyhow::Result<Option<u64>> {
1122        let query = format!(
1123            "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1124            table_name
1125        );
1126        let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1127            .bind(chain_id as i32)
1128            .bind(pool_address.to_string())
1129            .fetch_optional(&self.pool)
1130            .await
1131            .map_err(|e| {
1132                anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1133            })?;
1134
1135        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1136    }
1137
1138    /// Adds a batch of pool fee collect events to the database using batch operations.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if the database operation fails.
1143    pub async fn add_pool_collects_batch(
1144        &self,
1145        chain_id: u32,
1146        collects: &[PoolFeeCollect],
1147    ) -> anyhow::Result<()> {
1148        if collects.is_empty() {
1149            return Ok(());
1150        }
1151
1152        // Prepare vectors for each column
1153        let len = collects.len();
1154        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1155        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1156        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1157        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1158        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1159        let mut owners: Vec<String> = Vec::with_capacity(len);
1160        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1161        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1162        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1163        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1164        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1165
1166        // Fill vectors from collects
1167        for collect in collects {
1168            chain_ids.push(chain_id as i32);
1169            pool_addresses.push(collect.pool_address.to_string());
1170            blocks.push(collect.block as i64);
1171            transaction_hashes.push(collect.transaction_hash.clone());
1172            transaction_indices.push(collect.transaction_index as i32);
1173            log_indices.push(collect.log_index as i32);
1174            owners.push(collect.owner.to_string());
1175            amount0s.push(collect.amount0.to_string());
1176            amount1s.push(collect.amount1.to_string());
1177            tick_lowers.push(collect.tick_lower);
1178            tick_uppers.push(collect.tick_upper);
1179        }
1180
1181        // Execute batch insert with UNNEST
1182        sqlx::query(
1183            r"
1184            INSERT INTO pool_collect_event (
1185                chain_id, pool_address, block, transaction_hash, transaction_index,
1186                log_index, owner, amount0, amount1, tick_lower, tick_upper
1187            )
1188            SELECT
1189                chain_id, pool_address, block, transaction_hash, transaction_index,
1190                log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1191            FROM UNNEST(
1192                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1193                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1194            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1195                   log_index, owner, amount0, amount1, tick_lower, tick_upper)
1196            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1197           ",
1198        )
1199        .bind(&chain_ids[..])
1200        .bind(&pool_addresses[..])
1201        .bind(&blocks[..])
1202        .bind(&transaction_hashes[..])
1203        .bind(&transaction_indices[..])
1204        .bind(&log_indices[..])
1205        .bind(&owners[..])
1206        .bind(&amount0s[..])
1207        .bind(&amount1s[..])
1208        .bind(&tick_lowers[..])
1209        .bind(&tick_uppers[..])
1210        .execute(&self.pool)
1211        .await
1212        .map(|_| ())
1213        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1214    }
1215
1216    /// Inserts multiple pool flash events in a single database operation using UNNEST for optimal performance.
1217    ///
1218    /// # Errors
1219    ///
1220    /// Returns an error if the database operation fails.
1221    pub async fn add_pool_flash_batch(
1222        &self,
1223        chain_id: u32,
1224        flash_events: &[PoolFlash],
1225    ) -> anyhow::Result<()> {
1226        if flash_events.is_empty() {
1227            return Ok(());
1228        }
1229
1230        // Prepare vectors for each column
1231        let len = flash_events.len();
1232        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1233        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1234        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1235        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1236        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1237        let mut senders: Vec<String> = Vec::with_capacity(len);
1238        let mut recipients: Vec<String> = Vec::with_capacity(len);
1239        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1240        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1241        let mut paid0s: Vec<String> = Vec::with_capacity(len);
1242        let mut paid1s: Vec<String> = Vec::with_capacity(len);
1243        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1244
1245        // Fill vectors from flash events
1246        for flash in flash_events {
1247            chain_ids.push(chain_id as i32);
1248            pool_addresses.push(flash.pool_address.to_string());
1249            blocks.push(flash.block as i64);
1250            transaction_hashes.push(flash.transaction_hash.clone());
1251            transaction_indices.push(flash.transaction_index as i32);
1252            log_indices.push(flash.log_index as i32);
1253            senders.push(flash.sender.to_string());
1254            recipients.push(flash.recipient.to_string());
1255            amount0s.push(flash.amount0.to_string());
1256            amount1s.push(flash.amount1.to_string());
1257            paid0s.push(flash.paid0.to_string());
1258            paid1s.push(flash.paid1.to_string());
1259        }
1260
1261        // Execute batch insert with UNNEST
1262        sqlx::query(
1263            r"
1264            INSERT INTO pool_flash_event (
1265                chain_id, pool_address, block, transaction_hash, transaction_index,
1266                log_index, sender, recipient, amount0, amount1, paid0, paid1
1267            )
1268            SELECT
1269                chain_id, pool_address, block, transaction_hash, transaction_index,
1270                log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1271            FROM UNNEST(
1272                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1273                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1274            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1275                   log_index, sender, recipient, amount0, amount1, paid0, paid1)
1276            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1277           ",
1278        )
1279        .bind(&chain_ids[..])
1280        .bind(&pool_addresses[..])
1281        .bind(&blocks[..])
1282        .bind(&transaction_hashes[..])
1283        .bind(&transaction_indices[..])
1284        .bind(&log_indices[..])
1285        .bind(&senders[..])
1286        .bind(&recipients[..])
1287        .bind(&amount0s[..])
1288        .bind(&amount1s[..])
1289        .bind(&paid0s[..])
1290        .bind(&paid1s[..])
1291        .execute(&self.pool)
1292        .await
1293        .map(|_| ())
1294        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1295    }
1296
1297    pub async fn add_pool_snapshot(
1298        &self,
1299        chain_id: u32,
1300        pool_address: &Address,
1301        snapshot: &PoolSnapshot,
1302    ) -> anyhow::Result<()> {
1303        sqlx::query(
1304            r"
1305            INSERT INTO pool_snapshot (
1306                chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1307                current_tick, price_sqrt_ratio_x96, liquidity,
1308                protocol_fees_token0, protocol_fees_token1, fee_protocol,
1309                fee_growth_global_0, fee_growth_global_1,
1310                total_amount0_deposited, total_amount1_deposited,
1311                total_amount0_collected, total_amount1_collected,
1312                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes
1313            ) VALUES (
1314                $1, $2, $3, $4, $5, $6,
1315                $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1316                $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1317                $19, $20, $21, $22, $23
1318            )
1319            ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1320            DO NOTHING
1321            ",
1322        )
1323        .bind(chain_id as i32)
1324        .bind(pool_address.to_string())
1325        .bind(snapshot.block_position.number as i64)
1326        .bind(snapshot.block_position.transaction_index as i32)
1327        .bind(snapshot.block_position.log_index as i32)
1328        .bind(snapshot.block_position.transaction_hash.to_string())
1329        .bind(snapshot.state.current_tick)
1330        .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1331        .bind(snapshot.state.liquidity.to_string())
1332        .bind(snapshot.state.protocol_fees_token0.to_string())
1333        .bind(snapshot.state.protocol_fees_token1.to_string())
1334        .bind(snapshot.state.fee_protocol as i16)
1335        .bind(snapshot.state.fee_growth_global_0.to_string())
1336        .bind(snapshot.state.fee_growth_global_1.to_string())
1337        .bind(snapshot.analytics.total_amount0_deposited.to_string())
1338        .bind(snapshot.analytics.total_amount1_deposited.to_string())
1339        .bind(snapshot.analytics.total_amount0_collected.to_string())
1340        .bind(snapshot.analytics.total_amount1_collected.to_string())
1341        .bind(snapshot.analytics.total_swaps as i32)
1342        .bind(snapshot.analytics.total_mints as i32)
1343        .bind(snapshot.analytics.total_burns as i32)
1344        .bind(snapshot.analytics.total_fee_collects as i32)
1345        .bind(snapshot.analytics.total_flashes as i32)
1346        .execute(&self.pool)
1347        .await
1348        .map(|_| ())
1349        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1350    }
1351
1352    /// Inserts multiple pool positions in a single database operation using UNNEST for optimal performance.
1353    ///
1354    /// # Errors
1355    ///
1356    /// Returns an error if the database operation fails.
1357    pub async fn add_pool_positions_batch(
1358        &self,
1359        chain_id: u32,
1360        snapshot_block: u64,
1361        snapshot_transaction_index: u32,
1362        snapshot_log_index: u32,
1363        positions: &[(Address, PoolPosition)],
1364    ) -> anyhow::Result<()> {
1365        if positions.is_empty() {
1366            return Ok(());
1367        }
1368
1369        // Prepare vectors for each column
1370        let len = positions.len();
1371        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1372        let mut owners: Vec<String> = Vec::with_capacity(len);
1373        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1374        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1375        let mut liquidities: Vec<String> = Vec::with_capacity(len);
1376        let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1377        let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1378        let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1379        let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1380        let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1381        let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1382        let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1383        let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1384
1385        // Fill vectors from positions
1386        for (pool_address, position) in positions {
1387            pool_addresses.push(pool_address.to_string());
1388            owners.push(position.owner.to_string());
1389            tick_lowers.push(position.tick_lower);
1390            tick_uppers.push(position.tick_upper);
1391            liquidities.push(position.liquidity.to_string());
1392            fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1393            fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1394            tokens_owed_0s.push(position.tokens_owed_0.to_string());
1395            tokens_owed_1s.push(position.tokens_owed_1.to_string());
1396            total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1397            total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1398            total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1399            total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1400        }
1401
1402        // Execute batch insert with UNNEST
1403        sqlx::query(
1404            r"
1405            INSERT INTO pool_position (
1406                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1407                owner, tick_lower, tick_upper,
1408                liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1409                tokens_owed_0, tokens_owed_1,
1410                total_amount0_deposited, total_amount1_deposited,
1411                total_amount0_collected, total_amount1_collected
1412            )
1413            SELECT
1414                $1, pool_address, $2, $3, $4,
1415                owner, tick_lower, tick_upper,
1416                liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1417                tokens_owed_0::U128, tokens_owed_1::U128,
1418                total_amount0_deposited::U256, total_amount1_deposited::U256,
1419                total_amount0_collected::U128, total_amount1_collected::U128
1420            FROM UNNEST(
1421                $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1422                $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1423                $16::TEXT[], $17::TEXT[]
1424            ) AS t(pool_address, owner, tick_lower, tick_upper,
1425                   liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1426                   tokens_owed_0, tokens_owed_1,
1427                   total_amount0_deposited, total_amount1_deposited,
1428                   total_amount0_collected, total_amount1_collected)
1429            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1430            DO NOTHING
1431           ",
1432        )
1433        .bind(chain_id as i32)
1434        .bind(snapshot_block as i64)
1435        .bind(snapshot_transaction_index as i32)
1436        .bind(snapshot_log_index as i32)
1437        .bind(&pool_addresses[..])
1438        .bind(&owners[..])
1439        .bind(&tick_lowers[..])
1440        .bind(&tick_uppers[..])
1441        .bind(&liquidities[..])
1442        .bind(&fee_growth_inside_0_lasts[..])
1443        .bind(&fee_growth_inside_1_lasts[..])
1444        .bind(&tokens_owed_0s[..])
1445        .bind(&tokens_owed_1s[..])
1446        .bind(&total_amount0_depositeds as &[Option<String>])
1447        .bind(&total_amount1_depositeds as &[Option<String>])
1448        .bind(&total_amount0_collecteds as &[Option<String>])
1449        .bind(&total_amount1_collecteds as &[Option<String>])
1450        .execute(&self.pool)
1451        .await
1452        .map(|_| ())
1453        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1454    }
1455
1456    /// Inserts multiple pool ticks in a single database operation using UNNEST for optimal performance.
1457    ///
1458    /// # Errors
1459    ///
1460    /// Returns an error if the database operation fails.
1461    pub async fn add_pool_ticks_batch(
1462        &self,
1463        chain_id: u32,
1464        snapshot_block: u64,
1465        snapshot_transaction_index: u32,
1466        snapshot_log_index: u32,
1467        ticks: &[(Address, &PoolTick)],
1468    ) -> anyhow::Result<()> {
1469        if ticks.is_empty() {
1470            return Ok(());
1471        }
1472
1473        // Prepare vectors for each column
1474        let len = ticks.len();
1475        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1476        let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1477        let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1478        let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1479        let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1480        let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1481        let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1482        let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1483
1484        // Fill vectors from ticks
1485        for (pool_address, tick) in ticks {
1486            pool_addresses.push(pool_address.to_string());
1487            tick_values.push(tick.value);
1488            liquidity_grosses.push(tick.liquidity_gross.to_string());
1489            liquidity_nets.push(tick.liquidity_net.to_string());
1490            fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1491            fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1492            initializeds.push(tick.initialized);
1493            last_updated_blocks.push(tick.last_updated_block as i64);
1494        }
1495
1496        // Execute batch insert with UNNEST
1497        sqlx::query(
1498            r"
1499            INSERT INTO pool_tick (
1500                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1501                tick_value, liquidity_gross, liquidity_net,
1502                fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1503            )
1504            SELECT
1505                $1, pool_address, $2, $3, $4,
1506                tick_value, liquidity_gross::U128, liquidity_net::I128,
1507                fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1508            FROM UNNEST(
1509                $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1510                $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1511            ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1512                   fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1513            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1514            DO NOTHING
1515           ",
1516        )
1517        .bind(chain_id as i32)
1518        .bind(snapshot_block as i64)
1519        .bind(snapshot_transaction_index as i32)
1520        .bind(snapshot_log_index as i32)
1521        .bind(&pool_addresses[..])
1522        .bind(&tick_values[..])
1523        .bind(&liquidity_grosses[..])
1524        .bind(&liquidity_nets[..])
1525        .bind(&fee_growth_outside_0s[..])
1526        .bind(&fee_growth_outside_1s[..])
1527        .bind(&initializeds[..])
1528        .bind(&last_updated_blocks[..])
1529        .execute(&self.pool)
1530        .await
1531        .map(|_| ())
1532        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1533    }
1534
1535    pub async fn update_pool_initial_price_tick(
1536        &self,
1537        chain_id: u32,
1538        initialize_event: &InitializeEvent,
1539    ) -> anyhow::Result<()> {
1540        sqlx::query(
1541            r"
1542            UPDATE pool
1543            SET
1544                initial_tick = $4,
1545                initial_sqrt_price_x96 = $5
1546            WHERE chain_id = $1
1547            AND dex_name = $2
1548            AND address = $3
1549            ",
1550        )
1551        .bind(chain_id as i32)
1552        .bind(initialize_event.dex.name.to_string())
1553        .bind(initialize_event.pool_address.to_string())
1554        .bind(initialize_event.tick)
1555        .bind(initialize_event.sqrt_price_x96.to_string())
1556        .execute(&self.pool)
1557        .await
1558        .map(|_| ())
1559        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1560    }
1561
1562    /// Loads the latest valid pool snapshot from the database.
1563    ///
1564    /// Returns the most recent snapshot that has been validated against on-chain state.
1565    ///
1566    /// # Errors
1567    ///
1568    /// Returns an error if the database query fails.
1569    pub async fn load_latest_valid_pool_snapshot(
1570        &self,
1571        chain_id: u32,
1572        pool_address: &Address,
1573    ) -> anyhow::Result<Option<PoolSnapshot>> {
1574        let result = sqlx::query(
1575            r"
1576            SELECT
1577                block, transaction_index, log_index, transaction_hash,
1578                current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1579                protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1580                fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1581                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1582                total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1583                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1584                (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1585            FROM pool_snapshot
1586            WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1587            ORDER BY block DESC, transaction_index DESC, log_index DESC
1588            LIMIT 1
1589            ",
1590        )
1591        .bind(chain_id as i32)
1592        .bind(pool_address.to_string())
1593        .fetch_optional(&self.pool)
1594        .await
1595        .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1596
1597        if let Some(row) = result {
1598            // Parse snapshot state
1599            let block: i64 = row.get("block");
1600            let transaction_index: i32 = row.get("transaction_index");
1601            let log_index: i32 = row.get("log_index");
1602            let transaction_hash: String = row.get("transaction_hash");
1603
1604            let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1605                block as u64,
1606                transaction_hash,
1607                transaction_index as u32,
1608                log_index as u32,
1609            );
1610
1611            let state = PoolState {
1612                current_tick: row.get("current_tick"),
1613                price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1614                liquidity: row.get::<String, _>("liquidity").parse()?,
1615                protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1616                protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1617                fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1618                fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1619                fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1620            };
1621
1622            let analytics = PoolAnalytics {
1623                total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1624                total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1625                total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1626                total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1627                total_swaps: row.get::<i32, _>("total_swaps") as u64,
1628                total_mints: row.get::<i32, _>("total_mints") as u64,
1629                total_burns: row.get::<i32, _>("total_burns") as u64,
1630                total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1631                total_flashes: row.get::<i32, _>("total_flashes") as u64,
1632                #[cfg(debug_assertions)]
1633                swap_processing_time: std::time::Duration::ZERO,
1634                #[cfg(debug_assertions)]
1635                mint_processing_time: std::time::Duration::ZERO,
1636                #[cfg(debug_assertions)]
1637                burn_processing_time: std::time::Duration::ZERO,
1638                #[cfg(debug_assertions)]
1639                collect_processing_time: std::time::Duration::ZERO,
1640            };
1641
1642            // Load positions and ticks
1643            let positions = self
1644                .load_pool_positions_for_snapshot(
1645                    chain_id,
1646                    pool_address,
1647                    block as u64,
1648                    transaction_index as u32,
1649                    log_index as u32,
1650                )
1651                .await?;
1652
1653            let ticks = self
1654                .load_pool_ticks_for_snapshot(
1655                    chain_id,
1656                    pool_address,
1657                    block as u64,
1658                    transaction_index as u32,
1659                    log_index as u32,
1660                )
1661                .await?;
1662
1663            let dex_name: String = row.get("dex_name");
1664            let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1665                .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1666
1667            let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1668                .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1669
1670            let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1671                .ok_or_else(|| {
1672                    anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1673                })?;
1674
1675            let instrument_id =
1676                Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1677
1678            Ok(Some(PoolSnapshot::new(
1679                instrument_id,
1680                state,
1681                positions,
1682                ticks,
1683                analytics,
1684                block_position,
1685            )))
1686        } else {
1687            Ok(None)
1688        }
1689    }
1690
1691    /// Marks a pool snapshot as valid after successful on-chain verification.
1692    ///
1693    /// # Errors
1694    ///
1695    /// Returns an error if the database operation fails.
1696    pub async fn mark_pool_snapshot_valid(
1697        &self,
1698        chain_id: u32,
1699        pool_address: &Address,
1700        block: u64,
1701        transaction_index: u32,
1702        log_index: u32,
1703    ) -> anyhow::Result<()> {
1704        sqlx::query(
1705            r"
1706            UPDATE pool_snapshot
1707            SET is_valid = TRUE
1708            WHERE chain_id = $1
1709            AND pool_address = $2
1710            AND block = $3
1711            AND transaction_index = $4
1712            AND log_index = $5
1713            ",
1714        )
1715        .bind(chain_id as i32)
1716        .bind(pool_address.to_string())
1717        .bind(block as i64)
1718        .bind(transaction_index as i32)
1719        .bind(log_index as i32)
1720        .execute(&self.pool)
1721        .await
1722        .map(|_| ())
1723        .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1724    }
1725
1726    /// Loads all positions for a specific snapshot.
1727    ///
1728    /// # Errors
1729    ///
1730    /// Returns an error if the database query fails.
1731    pub async fn load_pool_positions_for_snapshot(
1732        &self,
1733        chain_id: u32,
1734        pool_address: &Address,
1735        snapshot_block: u64,
1736        snapshot_transaction_index: u32,
1737        snapshot_log_index: u32,
1738    ) -> anyhow::Result<Vec<PoolPosition>> {
1739        let rows = sqlx::query(
1740            r"
1741            SELECT
1742                owner, tick_lower, tick_upper,
1743                liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1744                tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1745                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1746                total_amount0_collected::TEXT, total_amount1_collected::TEXT
1747            FROM pool_position
1748            WHERE chain_id = $1
1749            AND pool_address = $2
1750            AND snapshot_block = $3
1751            AND snapshot_transaction_index = $4
1752            AND snapshot_log_index = $5
1753            ",
1754        )
1755        .bind(chain_id as i32)
1756        .bind(pool_address.to_string())
1757        .bind(snapshot_block as i64)
1758        .bind(snapshot_transaction_index as i32)
1759        .bind(snapshot_log_index as i32)
1760        .fetch_all(&self.pool)
1761        .await
1762        .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1763
1764        rows.iter()
1765            .map(|row| {
1766                let owner: String = row.get("owner");
1767                let position = PoolPosition {
1768                    owner: validate_address(&owner)?,
1769                    tick_lower: row.get("tick_lower"),
1770                    tick_upper: row.get("tick_upper"),
1771                    liquidity: row.get::<String, _>("liquidity").parse()?,
1772                    fee_growth_inside_0_last: row
1773                        .get::<String, _>("fee_growth_inside_0_last")
1774                        .parse()?,
1775                    fee_growth_inside_1_last: row
1776                        .get::<String, _>("fee_growth_inside_1_last")
1777                        .parse()?,
1778                    tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1779                    tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1780                    total_amount0_deposited: row
1781                        .get::<String, _>("total_amount0_deposited")
1782                        .parse()?,
1783                    total_amount1_deposited: row
1784                        .get::<String, _>("total_amount1_deposited")
1785                        .parse()?,
1786                    total_amount0_collected: row
1787                        .get::<String, _>("total_amount0_collected")
1788                        .parse()?,
1789                    total_amount1_collected: row
1790                        .get::<String, _>("total_amount1_collected")
1791                        .parse()?,
1792                };
1793                Ok(position)
1794            })
1795            .collect()
1796    }
1797
1798    /// Loads all ticks for a specific snapshot.
1799    ///
1800    /// # Errors
1801    ///
1802    /// Returns an error if the database query fails.
1803    pub async fn load_pool_ticks_for_snapshot(
1804        &self,
1805        chain_id: u32,
1806        pool_address: &Address,
1807        snapshot_block: u64,
1808        snapshot_transaction_index: u32,
1809        snapshot_log_index: u32,
1810    ) -> anyhow::Result<Vec<PoolTick>> {
1811        let rows = sqlx::query(
1812            r"
1813            SELECT
1814                tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1815                fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1816                last_updated_block
1817            FROM pool_tick
1818            WHERE chain_id = $1
1819            AND pool_address = $2
1820            AND snapshot_block = $3
1821            AND snapshot_transaction_index = $4
1822            AND snapshot_log_index = $5
1823            ",
1824        )
1825        .bind(chain_id as i32)
1826        .bind(pool_address.to_string())
1827        .bind(snapshot_block as i64)
1828        .bind(snapshot_transaction_index as i32)
1829        .bind(snapshot_log_index as i32)
1830        .fetch_all(&self.pool)
1831        .await
1832        .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1833
1834        rows.iter()
1835            .map(|row| {
1836                let tick = PoolTick::new(
1837                    row.get("tick_value"),
1838                    row.get::<String, _>("liquidity_gross").parse()?,
1839                    row.get::<String, _>("liquidity_net").parse()?,
1840                    row.get::<String, _>("fee_growth_outside_0").parse()?,
1841                    row.get::<String, _>("fee_growth_outside_1").parse()?,
1842                    row.get("initialized"),
1843                    row.get::<i64, _>("last_updated_block") as u64,
1844                );
1845                Ok(tick)
1846            })
1847            .collect()
1848    }
1849
1850    /// Streams pool events from all event tables (swap, liquidity, collect) for a specific pool.
1851    ///
1852    /// Creates a unified stream of pool events from multiple tables, ordering them chronologically
1853    /// by block number, transaction index, and log index. Optionally resumes from a specific block position.
1854    ///
1855    /// # Returns
1856    ///
1857    /// A stream of `DexPoolData` events in chronological order.
1858    ///
1859    /// # Errors
1860    ///
1861    /// Returns an error if the database query fails or if event transformation fails.
1862    pub fn stream_pool_events<'a>(
1863        &'a self,
1864        chain: SharedChain,
1865        dex: SharedDex,
1866        instrument_id: InstrumentId,
1867        pool_address: &Address,
1868        from_position: Option<BlockPosition>,
1869    ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1870        // Query without position filter (streams all events)
1871        const QUERY_ALL: &str = r"
1872            (SELECT
1873                'swap' as event_type,
1874                chain_id,
1875                pool_address,
1876                block,
1877                transaction_hash,
1878                transaction_index,
1879                log_index,
1880                sender,
1881                recipient,
1882                NULL::TEXT as owner,
1883                side,
1884                size,
1885                price,
1886                sqrt_price_x96::TEXT,
1887                liquidity::TEXT as swap_liquidity,
1888                tick as swap_tick,
1889                amount0::TEXT as swap_amount0,
1890                amount1::TEXT as swap_amount1,
1891                NULL::TEXT as position_liquidity,
1892                NULL::TEXT as amount0,
1893                NULL::TEXT as amount1,
1894                NULL::INT as tick_lower,
1895                NULL::INT as tick_upper,
1896                NULL::TEXT as liquidity_event_type,
1897                NULL::TEXT as flash_amount0,
1898                NULL::TEXT as flash_amount1,
1899                NULL::TEXT as flash_paid0,
1900                NULL::TEXT as flash_paid1
1901            FROM pool_swap_event
1902            WHERE chain_id = $1 AND pool_address = $2)
1903            UNION ALL
1904            (SELECT
1905                'liquidity' as event_type,
1906                chain_id,
1907                pool_address,
1908                block,
1909                transaction_hash,
1910                transaction_index,
1911                log_index,
1912                sender,
1913                NULL::TEXT as recipient,
1914                owner,
1915                NULL::TEXT as side,
1916                NULL::TEXT as size,
1917                NULL::text as price,
1918                NULL::text as sqrt_price_x96,
1919                NULL::TEXT as swap_liquidity,
1920                NULL::INT as swap_tick,
1921                amount0::TEXT as swap_amount0,
1922                amount1::TEXT as swap_amount1,
1923                position_liquidity::TEXT,
1924                amount0::TEXT,
1925                amount1::TEXT,
1926                tick_lower::INT,
1927                tick_upper::INT,
1928                event_type as liquidity_event_type,
1929                NULL::TEXT as flash_amount0,
1930                NULL::TEXT as flash_amount1,
1931                NULL::TEXT as flash_paid0,
1932                NULL::TEXT as flash_paid1
1933            FROM pool_liquidity_event
1934            WHERE chain_id = $1 AND pool_address = $2)
1935            UNION ALL
1936            (SELECT
1937                'collect' as event_type,
1938                chain_id,
1939                pool_address,
1940                block,
1941                transaction_hash,
1942                transaction_index,
1943                log_index,
1944                NULL::TEXT as sender,
1945                NULL::TEXT as recipient,
1946                owner,
1947                NULL::TEXT as side,
1948                NULL::TEXT as size,
1949                NULL::TEXT as price,
1950                NULL::TEXT as sqrt_price_x96,
1951                NULL::TEXT as swap_liquidity,
1952                NULL::INT AS swap_tick,
1953                amount0::TEXT as swap_amount0,
1954                amount1::TEXT as swap_amount1,
1955                NULL::TEXT as position_liquidity,
1956                amount0::TEXT,
1957                amount1::TEXT,
1958                tick_lower::INT,
1959                tick_upper::INT,
1960                NULL::TEXT as liquidity_event_type,
1961                NULL::TEXT as flash_amount0,
1962                NULL::TEXT as flash_amount1,
1963                NULL::TEXT as flash_paid0,
1964                NULL::TEXT as flash_paid1
1965            FROM pool_collect_event
1966            WHERE chain_id = $1 AND pool_address = $2)
1967            UNION ALL
1968            (SELECT
1969                'flash' as event_type,
1970                chain_id,
1971                pool_address,
1972                block,
1973                transaction_hash,
1974                transaction_index,
1975                log_index,
1976                sender,
1977                recipient,
1978                NULL::TEXT as owner,
1979                NULL::TEXT as side,
1980                NULL::TEXT as size,
1981                NULL::TEXT as price,
1982                NULL::TEXT as sqrt_price_x96,
1983                NULL::TEXT as swap_liquidity,
1984                NULL::INT AS swap_tick,
1985                NULL::TEXT as swap_amount0,
1986                NULL::TEXT as swap_amount1,
1987                NULL::TEXT as position_liquidity,
1988                NULL::TEXT as amount0,
1989                NULL::TEXT as amount1,
1990                NULL::INT as tick_lower,
1991                NULL::INT as tick_upper,
1992                NULL::TEXT as liquidity_event_type,
1993                amount0::TEXT as flash_amount0,
1994                amount1::TEXT as flash_amount1,
1995                paid0::TEXT as flash_paid0,
1996                paid1::TEXT as flash_paid1
1997            FROM pool_flash_event
1998            WHERE chain_id = $1 AND pool_address = $2)
1999            ORDER BY block, transaction_index, log_index";
2000
2001        // Query with position filter (resumes from specific block position)
2002        const QUERY_FROM_POSITION: &str = r"
2003            (SELECT
2004                'swap' as event_type,
2005                chain_id,
2006                pool_address,
2007                block,
2008                transaction_hash,
2009                transaction_index,
2010                log_index,
2011                sender,
2012                recipient,
2013                NULL::TEXT as owner,
2014                side,
2015                size,
2016                price,
2017                sqrt_price_x96::TEXT,
2018                liquidity::TEXT as swap_liquidity,
2019                tick as swap_tick,
2020                amount0::TEXT as swap_amount0,
2021                amount1::TEXT as swap_amount1,
2022                NULL::TEXT as position_liquidity,
2023                NULL::TEXT as amount0,
2024                NULL::TEXT as amount1,
2025                NULL::INT as tick_lower,
2026                NULL::INT as tick_upper,
2027                NULL::TEXT as liquidity_event_type,
2028                NULL::TEXT as flash_amount0,
2029                NULL::TEXT as flash_amount1,
2030                NULL::TEXT as flash_paid0,
2031                NULL::TEXT as flash_paid1
2032            FROM pool_swap_event
2033            WHERE chain_id = $1 AND pool_address = $2
2034            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2035            UNION ALL
2036            (SELECT
2037                'liquidity' as event_type,
2038                chain_id,
2039                pool_address,
2040                block,
2041                transaction_hash,
2042                transaction_index,
2043                log_index,
2044                sender,
2045                NULL::TEXT as recipient,
2046                owner,
2047                NULL::TEXT as side,
2048                NULL::TEXT as size,
2049                NULL::text as price,
2050                NULL::text as sqrt_price_x96,
2051                NULL::TEXT as swap_liquidity,
2052                NULL::INT as swap_tick,
2053                amount0::TEXT as swap_amount0,
2054                amount1::TEXT as swap_amount1,
2055                position_liquidity::TEXT,
2056                amount0::TEXT,
2057                amount1::TEXT,
2058                tick_lower::INT,
2059                tick_upper::INT,
2060                event_type as liquidity_event_type,
2061                NULL::TEXT as flash_amount0,
2062                NULL::TEXT as flash_amount1,
2063                NULL::TEXT as flash_paid0,
2064                NULL::TEXT as flash_paid1
2065            FROM pool_liquidity_event
2066            WHERE chain_id = $1 AND pool_address = $2
2067            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2068            UNION ALL
2069            (SELECT
2070                'collect' as event_type,
2071                chain_id,
2072                pool_address,
2073                block,
2074                transaction_hash,
2075                transaction_index,
2076                log_index,
2077                NULL::TEXT as sender,
2078                NULL::TEXT as recipient,
2079                owner,
2080                NULL::TEXT as side,
2081                NULL::TEXT as size,
2082                NULL::TEXT as price,
2083                NULL::TEXT as sqrt_price_x96,
2084                NULL::TEXT as swap_liquidity,
2085                NULL::INT AS swap_tick,
2086                amount0::TEXT as swap_amount0,
2087                amount1::TEXT as swap_amount1,
2088                NULL::TEXT as position_liquidity,
2089                amount0::TEXT,
2090                amount1::TEXT,
2091                tick_lower::INT,
2092                tick_upper::INT,
2093                NULL::TEXT as liquidity_event_type,
2094                NULL::TEXT as flash_amount0,
2095                NULL::TEXT as flash_amount1,
2096                NULL::TEXT as flash_paid0,
2097                NULL::TEXT as flash_paid1
2098            FROM pool_collect_event
2099            WHERE chain_id = $1 AND pool_address = $2
2100            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2101            UNION ALL
2102            (SELECT
2103                'flash' as event_type,
2104                chain_id,
2105                pool_address,
2106                block,
2107                transaction_hash,
2108                transaction_index,
2109                log_index,
2110                sender,
2111                recipient,
2112                NULL::TEXT as owner,
2113                NULL::TEXT as side,
2114                NULL::TEXT as size,
2115                NULL::TEXT as price,
2116                NULL::TEXT as sqrt_price_x96,
2117                NULL::TEXT as swap_liquidity,
2118                NULL::INT AS swap_tick,
2119                NULL::TEXT as swap_amount0,
2120                NULL::TEXT as swap_amount1,
2121                NULL::TEXT as position_liquidity,
2122                NULL::TEXT as amount0,
2123                NULL::TEXT as amount1,
2124                NULL::INT as tick_lower,
2125                NULL::INT as tick_upper,
2126                NULL::TEXT as liquidity_event_type,
2127                amount0::TEXT as flash_amount0,
2128                amount1::TEXT as flash_amount1,
2129                paid0::TEXT as flash_paid0,
2130                paid1::TEXT as flash_paid1
2131            FROM pool_flash_event
2132            WHERE chain_id = $1 AND pool_address = $2
2133            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2134            ORDER BY block, transaction_index, log_index";
2135
2136        // Build query with appropriate bindings
2137        let query = if let Some(pos) = from_position {
2138            sqlx::query(QUERY_FROM_POSITION)
2139                .bind(chain.chain_id as i32)
2140                .bind(pool_address.to_string())
2141                .bind(pos.number as i64)
2142                .bind(pos.transaction_index as i32)
2143                .bind(pos.log_index as i32)
2144                .fetch(&self.pool)
2145        } else {
2146            sqlx::query(QUERY_ALL)
2147                .bind(chain.chain_id as i32)
2148                .bind(pool_address.to_string())
2149                .fetch(&self.pool)
2150        };
2151
2152        // Transform rows to events
2153        let stream = query.map(move |row_result| match row_result {
2154            Ok(row) => {
2155                transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2156                    .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2157            }
2158            Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2159        });
2160
2161        Box::pin(stream)
2162    }
2163}