nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::pin::Pin;
17
18use alloy::primitives::{Address, U256};
19use futures_util::{Stream, StreamExt};
20use nautilus_model::{
21    defi::{
22        Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24        pool_analysis::{
25            position::PoolPosition,
26            snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27        },
28        tick_map::tick::PoolTick,
29        validation::validate_address,
30    },
31    identifiers::InstrumentId,
32};
33use rust_decimal::Decimal;
34use sqlx::{PgPool, Row, postgres::PgConnectOptions};
35
36use crate::{
37    cache::{
38        consistency::CachedBlocksConsistencyStatus,
39        copy::PostgresCopyHandler,
40        rows::{BlockTimestampRow, PoolRow, TokenRow, transform_row_to_dex_pool_data},
41        types::{U128Pg, U256Pg},
42    },
43    events::initialize::InitializeEvent,
44};
45
46/// Database interface for persisting and retrieving blockchain entities and domain objects.
47#[derive(Debug)]
48pub struct BlockchainCacheDatabase {
49    /// PostgreSQL connection pool used for database operations.
50    pool: PgPool,
51}
52
53impl BlockchainCacheDatabase {
54    /// Initializes a new database instance by establishing a connection to PostgreSQL.
55    ///
56    /// # Panics
57    ///
58    /// Panics if unable to connect to PostgreSQL with the provided options.
59    pub async fn init(pg_options: PgConnectOptions) -> Self {
60        let pool = sqlx::postgres::PgPoolOptions::new()
61            .max_connections(32) // Increased from default 10
62            .min_connections(5) // Keep some connections warm
63            .acquire_timeout(std::time::Duration::from_secs(3))
64            .connect_with(pg_options)
65            .await
66            .expect("Error connecting to Postgres");
67        Self { pool }
68    }
69
70    /// Seeds the database with a blockchain chain record.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the database operation fails.
75    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
76        sqlx::query(
77            r"
78            INSERT INTO chain (
79                chain_id, name
80            ) VALUES ($1,$2)
81            ON CONFLICT (chain_id)
82            DO NOTHING
83        ",
84        )
85        .bind(chain.chain_id as i32)
86        .bind(chain.name.to_string())
87        .execute(&self.pool)
88        .await
89        .map(|_| ())
90        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
91    }
92
93    /// Creates a table partition for the block table specific to the given chain
94    /// by calling the existing PostgreSQL function `create_block_partition`.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the database operation fails.
99    pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
100        let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
101            .bind(chain.chain_id as i32)
102            .fetch_one(&self.pool)
103            .await
104            .map_err(|e| {
105                anyhow::anyhow!(
106                    "Failed to call create_block_partition for chain {}: {e}",
107                    chain.chain_id
108                )
109            })?;
110
111        Ok(result.0)
112    }
113
114    /// Creates a table partition for the token table specific to the given chain
115    /// by calling the existing PostgreSQL function `create_token_partition`.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the database operation fails.
120    pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
121        let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
122            .bind(chain.chain_id as i32)
123            .fetch_one(&self.pool)
124            .await
125            .map_err(|e| {
126                anyhow::anyhow!(
127                    "Failed to call create_token_partition for chain {}: {e}",
128                    chain.chain_id
129                )
130            })?;
131
132        Ok(result.0)
133    }
134
135    /// Returns the highest block number that maintains data continuity in the database.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the database query fails.
140    pub async fn get_block_consistency_status(
141        &self,
142        chain: &Chain,
143    ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
144        tracing::info!("Fetching block consistency status");
145
146        let result: (i64, i64) = sqlx::query_as(
147            r"
148            SELECT
149                COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
150                get_last_continuous_block($1) as last_continuous_block
151            "
152        )
153        .bind(chain.chain_id as i32)
154        .fetch_one(&self.pool)
155        .await
156        .map_err(|e| {
157            anyhow::anyhow!(
158                "Failed to get block info for chain {}: {}",
159                chain.chain_id,
160                e
161            )
162        })?;
163
164        Ok(CachedBlocksConsistencyStatus::new(
165            result.0 as u64,
166            result.1 as u64,
167        ))
168    }
169
170    /// Inserts or updates a block record in the database.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the database operation fails.
175    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
176        sqlx::query(
177            r"
178            INSERT INTO block (
179                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
180                base_fee_per_gas, blob_gas_used, excess_blob_gas,
181                l1_gas_price, l1_gas_used, l1_fee_scalar
182            ) VALUES (
183                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
184            )
185            ON CONFLICT (chain_id, number)
186            DO UPDATE
187            SET
188                hash = $3,
189                parent_hash = $4,
190                miner = $5,
191                gas_limit = $6,
192                gas_used = $7,
193                timestamp = $8,
194                base_fee_per_gas = $9,
195                blob_gas_used = $10,
196                excess_blob_gas = $11,
197                l1_gas_price = $12,
198                l1_gas_used = $13,
199                l1_fee_scalar = $14
200        ",
201        )
202        .bind(chain_id as i32)
203        .bind(block.number as i64)
204        .bind(block.hash.as_str())
205        .bind(block.parent_hash.as_str())
206        .bind(block.miner.as_str())
207        .bind(block.gas_limit as i64)
208        .bind(block.gas_used as i64)
209        .bind(block.timestamp.to_string())
210        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
211        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
212        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
213        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
214        .bind(block.l1_gas_used.map(|v| v as i64))
215        .bind(block.l1_fee_scalar.map(|v| v as i64))
216        .execute(&self.pool)
217        .await
218        .map(|_| ())
219        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
220    }
221
222    /// Inserts multiple blocks in a single database operation using UNNEST for optimal performance.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the database operation fails.
227    pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
228        if blocks.is_empty() {
229            return Ok(());
230        }
231
232        // Prepare vectors for each column
233        let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
234        let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
235        let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
236        let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
237        let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
238        let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
239        let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
240        let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
241        let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
242        let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
243        let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
244        let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
245        let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
246
247        // Fill vectors from blocks
248        for block in blocks {
249            numbers.push(block.number as i64);
250            hashes.push(block.hash.clone());
251            parent_hashes.push(block.parent_hash.clone());
252            miners.push(block.miner.to_string());
253            gas_limits.push(block.gas_limit as i64);
254            gas_useds.push(block.gas_used as i64);
255            timestamps.push(block.timestamp.to_string());
256            base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
257            blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
258            excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
259            l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
260            l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
261            l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
262        }
263
264        // Execute batch insert with UNNEST
265        sqlx::query(
266            r"
267            INSERT INTO block (
268                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
269                base_fee_per_gas, blob_gas_used, excess_blob_gas,
270                l1_gas_price, l1_gas_used, l1_fee_scalar
271            )
272            SELECT
273                $1, *
274            FROM UNNEST(
275                $2::int8[], $3::text[], $4::text[], $5::text[],
276                $6::int8[], $7::int8[], $8::text[],
277                $9::text[], $10::text[], $11::text[],
278                $12::text[], $13::int8[], $14::int8[]
279            )
280            ON CONFLICT (chain_id, number) DO NOTHING
281           ",
282        )
283        .bind(chain_id as i32)
284        .bind(&numbers[..])
285        .bind(&hashes[..])
286        .bind(&parent_hashes[..])
287        .bind(&miners[..])
288        .bind(&gas_limits[..])
289        .bind(&gas_useds[..])
290        .bind(&timestamps[..])
291        .bind(&base_fee_per_gases as &[Option<String>])
292        .bind(&blob_gas_useds as &[Option<String>])
293        .bind(&excess_blob_gases as &[Option<String>])
294        .bind(&l1_gas_prices as &[Option<String>])
295        .bind(&l1_gas_useds as &[Option<i64>])
296        .bind(&l1_fee_scalars as &[Option<i64>])
297        .execute(&self.pool)
298        .await
299        .map(|_| ())
300        .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
301    }
302
303    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
304    ///
305    /// This method is significantly faster than INSERT for bulk operations as it bypasses
306    /// SQL parsing and uses PostgreSQL's native binary protocol.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the COPY operation fails.
311    pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
312        let copy_handler = PostgresCopyHandler::new(&self.pool);
313        copy_handler.copy_blocks(chain_id, blocks).await
314    }
315
316    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the COPY operation fails.
321    pub async fn add_tokens_copy(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
322        let copy_handler = PostgresCopyHandler::new(&self.pool);
323        copy_handler.copy_tokens(chain_id, tokens).await
324    }
325
326    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the COPY operation fails.
331    pub async fn add_pools_copy(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
332        let copy_handler = PostgresCopyHandler::new(&self.pool);
333        copy_handler.copy_pools(chain_id, pools).await
334    }
335
336    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
337    ///
338    /// This method is significantly faster than INSERT for bulk operations as it bypasses
339    /// SQL parsing and uses PostgreSQL's native binary protocol.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the COPY operation fails.
344    pub async fn add_pool_swaps_copy(
345        &self,
346        chain_id: u32,
347        swaps: &[PoolSwap],
348    ) -> anyhow::Result<()> {
349        let copy_handler = PostgresCopyHandler::new(&self.pool);
350        copy_handler.copy_pool_swaps(chain_id, swaps).await
351    }
352
353    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
354    ///
355    /// This method is significantly faster than INSERT for bulk operations as it bypasses
356    /// SQL parsing and uses PostgreSQL's native binary protocol.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the COPY operation fails.
361    pub async fn add_pool_liquidity_updates_copy(
362        &self,
363        chain_id: u32,
364        updates: &[PoolLiquidityUpdate],
365    ) -> anyhow::Result<()> {
366        let copy_handler = PostgresCopyHandler::new(&self.pool);
367        copy_handler
368            .copy_pool_liquidity_updates(chain_id, updates)
369            .await
370    }
371
372    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
373    ///
374    /// This method is significantly faster than INSERT for bulk operations as it bypasses
375    /// SQL parsing and most database validation checks.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the COPY operation fails.
380    pub async fn copy_pool_fee_collects_batch(
381        &self,
382        chain_id: u32,
383        collects: &[PoolFeeCollect],
384    ) -> anyhow::Result<()> {
385        let copy_handler = PostgresCopyHandler::new(&self.pool);
386        copy_handler.copy_pool_collects(chain_id, collects).await
387    }
388
389    /// Retrieves block timestamps for a given chain starting from a specific block number.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the database query fails.
394    pub async fn load_block_timestamps(
395        &self,
396        chain: SharedChain,
397        from_block: u64,
398    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
399        sqlx::query_as::<_, BlockTimestampRow>(
400            r"
401            SELECT
402                number,
403                timestamp
404            FROM block
405            WHERE chain_id = $1 AND number >= $2
406            ORDER BY number ASC
407            ",
408        )
409        .bind(chain.chain_id as i32)
410        .bind(from_block as i64)
411        .fetch_all(&self.pool)
412        .await
413        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
414    }
415
416    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the database operation fails.
421    pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
422        sqlx::query(
423            r"
424            INSERT INTO dex (
425                chain_id, name, factory_address, creation_block
426            ) VALUES ($1, $2, $3, $4)
427            ON CONFLICT (chain_id, name)
428            DO UPDATE
429            SET
430                factory_address = $3,
431                creation_block = $4
432        ",
433        )
434        .bind(dex.chain.chain_id as i32)
435        .bind(dex.name.to_string())
436        .bind(dex.factory.to_string())
437        .bind(dex.factory_creation_block as i64)
438        .execute(&self.pool)
439        .await
440        .map(|_| ())
441        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
442    }
443
444    /// Adds or updates a liquidity pool/pair record in the database.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if the database operation fails.
449    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
450        sqlx::query(
451            r"
452            INSERT INTO pool (
453                chain_id, address, dex_name, creation_block,
454                token0_chain, token0_address,
455                token1_chain, token1_address,
456                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
457            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
458            ON CONFLICT (chain_id, address)
459            DO UPDATE
460            SET
461                dex_name = $3,
462                creation_block = $4,
463                token0_chain = $5,
464                token0_address = $6,
465                token1_chain = $7,
466                token1_address = $8,
467                fee = $9,
468                tick_spacing = $10,
469                initial_tick = $11,
470                initial_sqrt_price_x96 = $12
471        ",
472        )
473        .bind(pool.chain.chain_id as i32)
474        .bind(pool.address.to_string())
475        .bind(pool.dex.name.to_string())
476        .bind(pool.creation_block as i64)
477        .bind(pool.token0.chain.chain_id as i32)
478        .bind(pool.token0.address.to_string())
479        .bind(pool.token1.chain.chain_id as i32)
480        .bind(pool.token1.address.to_string())
481        .bind(pool.fee.map(|fee| fee as i32))
482        .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
483        .bind(pool.initial_tick)
484        .bind(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()))
485        .execute(&self.pool)
486        .await
487        .map(|_| ())
488        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
489    }
490
491    /// Inserts multiple pools in a single database operation using UNNEST for optimal performance.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the database operation fails.
496    pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
497        if pools.is_empty() {
498            return Ok(());
499        }
500
501        // Prepare vectors for each column
502        let len = pools.len();
503        let mut addresses: Vec<String> = Vec::with_capacity(len);
504        let mut dex_names: Vec<String> = Vec::with_capacity(len);
505        let mut creation_blocks: Vec<i64> = Vec::with_capacity(len);
506        let mut token0_chains: Vec<i32> = Vec::with_capacity(len);
507        let mut token0_addresses: Vec<String> = Vec::with_capacity(len);
508        let mut token1_chains: Vec<i32> = Vec::with_capacity(len);
509        let mut token1_addresses: Vec<String> = Vec::with_capacity(len);
510        let mut fees: Vec<Option<i32>> = Vec::with_capacity(len);
511        let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(len);
512        let mut initial_ticks: Vec<Option<i32>> = Vec::with_capacity(len);
513        let mut initial_sqrt_price_x96s: Vec<Option<String>> = Vec::with_capacity(len);
514        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
515
516        // Fill vectors from pools
517        for pool in pools {
518            chain_ids.push(pool.chain.chain_id as i32);
519            addresses.push(pool.address.to_string());
520            dex_names.push(pool.dex.name.to_string());
521            creation_blocks.push(pool.creation_block as i64);
522            token0_chains.push(pool.token0.chain.chain_id as i32);
523            token0_addresses.push(pool.token0.address.to_string());
524            token1_chains.push(pool.token1.chain.chain_id as i32);
525            token1_addresses.push(pool.token1.address.to_string());
526            fees.push(pool.fee.map(|fee| fee as i32));
527            tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
528            initial_ticks.push(pool.initial_tick);
529            initial_sqrt_price_x96s
530                .push(pool.initial_sqrt_price_x96.as_ref().map(|p| p.to_string()));
531        }
532
533        // Execute batch insert with UNNEST
534        sqlx::query(
535            r"
536            INSERT INTO pool (
537                chain_id, address, dex_name, creation_block,
538                token0_chain, token0_address,
539                token1_chain, token1_address,
540                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
541            )
542            SELECT *
543            FROM UNNEST(
544                $1::int4[], $2::text[], $3::text[], $4::int8[],
545                $5::int4[], $6::text[], $7::int4[], $8::text[],
546                $9::int4[], $10::int4[], $11::int4[], $12::text[]
547            )
548            ON CONFLICT (chain_id, address) DO NOTHING
549           ",
550        )
551        .bind(&chain_ids[..])
552        .bind(&addresses[..])
553        .bind(&dex_names[..])
554        .bind(&creation_blocks[..])
555        .bind(&token0_chains[..])
556        .bind(&token0_addresses[..])
557        .bind(&token1_chains[..])
558        .bind(&token1_addresses[..])
559        .bind(&fees[..])
560        .bind(&tick_spacings[..])
561        .bind(&initial_ticks[..])
562        .bind(&initial_sqrt_price_x96s[..])
563        .execute(&self.pool)
564        .await
565        .map(|_| ())
566        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
567    }
568
569    /// Inserts multiple pool swaps in a single database operation using UNNEST for optimal performance.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if the database operation fails.
574    pub async fn add_pool_swaps_batch(
575        &self,
576        chain_id: u32,
577        swaps: &[PoolSwap],
578    ) -> anyhow::Result<()> {
579        if swaps.is_empty() {
580            return Ok(());
581        }
582
583        // Prepare vectors for each column
584        let len = swaps.len();
585        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
586        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
587        let mut blocks: Vec<i64> = Vec::with_capacity(len);
588        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
589        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
590        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
591        let mut senders: Vec<String> = Vec::with_capacity(len);
592        let mut recipients: Vec<String> = Vec::with_capacity(len);
593        let mut sqrt_price_x96s: Vec<String> = Vec::with_capacity(len);
594        let mut liquidities: Vec<String> = Vec::with_capacity(len);
595        let mut ticks: Vec<i32> = Vec::with_capacity(len);
596        let mut amount0s: Vec<String> = Vec::with_capacity(len);
597        let mut amount1s: Vec<String> = Vec::with_capacity(len);
598        let mut order_sides: Vec<Option<String>> = Vec::with_capacity(len);
599        let mut base_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
600        let mut quote_quantities: Vec<Option<Decimal>> = Vec::with_capacity(len);
601        let mut spot_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
602        let mut execution_prices: Vec<Option<Decimal>> = Vec::with_capacity(len);
603
604        // Fill vectors from swaps
605        for swap in swaps {
606            chain_ids.push(chain_id as i32);
607            pool_addresses.push(swap.pool_address.to_string());
608            blocks.push(swap.block as i64);
609            transaction_hashes.push(swap.transaction_hash.clone());
610            transaction_indices.push(swap.transaction_index as i32);
611            log_indices.push(swap.log_index as i32);
612            senders.push(swap.sender.to_string());
613            recipients.push(swap.recipient.to_string());
614            sqrt_price_x96s.push(swap.sqrt_price_x96.to_string());
615            liquidities.push(swap.liquidity.to_string());
616            ticks.push(swap.tick);
617            amount0s.push(swap.amount0.to_string());
618            amount1s.push(swap.amount1.to_string());
619
620            // Extract trade_info fields if available
621            if let Some(ref trade_info) = swap.trade_info {
622                order_sides.push(Some(trade_info.order_side.to_string()));
623                base_quantities.push(Some(trade_info.quantity_base.as_decimal()));
624                quote_quantities.push(Some(trade_info.quantity_quote.as_decimal()));
625                spot_prices.push(Some(trade_info.spot_price.as_decimal()));
626                execution_prices.push(Some(trade_info.execution_price.as_decimal()));
627            } else {
628                order_sides.push(None);
629                base_quantities.push(None);
630                quote_quantities.push(None);
631                spot_prices.push(None);
632                execution_prices.push(None);
633            }
634        }
635
636        // Execute batch insert with UNNEST
637        sqlx::query(
638            r"
639            INSERT INTO pool_swap_event (
640                chain_id, pool_address, block, transaction_hash, transaction_index,
641                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
642                order_side, base_quantity, quote_quantity, spot_price, execution_price
643            )
644            SELECT
645                chain_id, pool_address, block, transaction_hash, transaction_index, log_index, sender, recipient,
646                sqrt_price_x96::U160, liquidity::U128, tick, amount0::I256, amount1::I256,
647                order_side, base_quantity, quote_quantity, spot_price, execution_price
648            FROM UNNEST(
649                $1::INT[], $2::TEXT[], $3::BIGINT[], $4::TEXT[], $5::INT[], $6::INT[],
650                $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::INT[], $12::TEXT[], $13::TEXT[],
651                $14::TEXT[], $15, $16, $17, $18
652            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
653                   log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
654                   order_side, base_quantity, quote_quantity, spot_price, execution_price)
655            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
656           ",
657        )
658        .bind(&chain_ids[..])
659        .bind(&pool_addresses[..])
660        .bind(&blocks[..])
661        .bind(&transaction_hashes[..])
662        .bind(&transaction_indices[..])
663        .bind(&log_indices[..])
664        .bind(&senders[..])
665        .bind(&recipients[..])
666        .bind(&sqrt_price_x96s[..])
667        .bind(&liquidities[..])
668        .bind(&ticks[..])
669        .bind(&amount0s[..])
670        .bind(&amount1s[..])
671        .bind(&order_sides[..])
672        .bind(&base_quantities[..])
673        .bind(&quote_quantities[..])
674        .bind(&spot_prices[..])
675        .bind(&execution_prices[..])
676        .execute(&self.pool)
677        .await
678        .map(|_| ())
679        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
680    }
681
682    /// Inserts multiple pool liquidity updates in a single database operation using UNNEST for optimal performance.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the database operation fails.
687    pub async fn add_pool_liquidity_updates_batch(
688        &self,
689        chain_id: u32,
690        updates: &[PoolLiquidityUpdate],
691    ) -> anyhow::Result<()> {
692        if updates.is_empty() {
693            return Ok(());
694        }
695
696        // Prepare vectors for each column
697        let len = updates.len();
698        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
699        let mut blocks: Vec<i64> = Vec::with_capacity(len);
700        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
701        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
702        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
703        let mut event_types: Vec<String> = Vec::with_capacity(len);
704        let mut senders: Vec<Option<String>> = Vec::with_capacity(len);
705        let mut owners: Vec<String> = Vec::with_capacity(len);
706        let mut position_liquidities: Vec<String> = Vec::with_capacity(len);
707        let mut amount0s: Vec<String> = Vec::with_capacity(len);
708        let mut amount1s: Vec<String> = Vec::with_capacity(len);
709        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
710        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
711        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
712
713        // Fill vectors from updates
714        for update in updates {
715            chain_ids.push(chain_id as i32);
716            pool_addresses.push(update.pool_address.to_string());
717            blocks.push(update.block as i64);
718            transaction_hashes.push(update.transaction_hash.clone());
719            transaction_indices.push(update.transaction_index as i32);
720            log_indices.push(update.log_index as i32);
721            event_types.push(update.kind.to_string());
722            senders.push(update.sender.map(|s| s.to_string()));
723            owners.push(update.owner.to_string());
724            position_liquidities.push(update.position_liquidity.to_string());
725            amount0s.push(update.amount0.to_string());
726            amount1s.push(update.amount1.to_string());
727            tick_lowers.push(update.tick_lower);
728            tick_uppers.push(update.tick_upper);
729        }
730
731        // Execute batch insert with UNNEST
732        sqlx::query(
733            r"
734            INSERT INTO pool_liquidity_event (
735                chain_id, pool_address, block, transaction_hash, transaction_index,
736                log_index, event_type, sender, owner, position_liquidity,
737                amount0, amount1, tick_lower, tick_upper
738            )
739            SELECT
740                chain_id, pool_address, block, transaction_hash, transaction_index,
741                log_index, event_type, sender, owner, position_liquidity::u128,
742                amount0::U256, amount1::U256, tick_lower, tick_upper
743            FROM UNNEST(
744                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
745                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[],
746                $11::TEXT[], $12::TEXT[], $13::INT[], $14::INT[]
747            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
748                   log_index, event_type, sender, owner, position_liquidity,
749                   amount0, amount1, tick_lower, tick_upper)
750            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
751           ",
752        )
753        .bind(&chain_ids[..])
754        .bind(&pool_addresses[..])
755        .bind(&blocks[..])
756        .bind(&transaction_hashes[..])
757        .bind(&transaction_indices[..])
758        .bind(&log_indices[..])
759        .bind(&event_types[..])
760        .bind(&senders[..])
761        .bind(&owners[..])
762        .bind(&position_liquidities[..])
763        .bind(&amount0s[..])
764        .bind(&amount1s[..])
765        .bind(&tick_lowers[..])
766        .bind(&tick_uppers[..])
767        .execute(&self.pool)
768        .await
769        .map(|_| ())
770        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
771    }
772
773    /// Adds or updates a token record in the database.
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if the database operation fails.
778    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
779        sqlx::query(
780            r"
781            INSERT INTO token (
782                chain_id, address, name, symbol, decimals
783            ) VALUES ($1, $2, $3, $4, $5)
784            ON CONFLICT (chain_id, address)
785            DO UPDATE
786            SET
787                name = $3,
788                symbol = $4,
789                decimals = $5
790        ",
791        )
792        .bind(token.chain.chain_id as i32)
793        .bind(token.address.to_string())
794        .bind(token.name.as_str())
795        .bind(token.symbol.as_str())
796        .bind(i32::from(token.decimals))
797        .execute(&self.pool)
798        .await
799        .map(|_| ())
800        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
801    }
802
803    /// Records an invalid token address with associated error information.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the database insertion fails.
808    pub async fn add_invalid_token(
809        &self,
810        chain_id: u32,
811        address: &Address,
812        error_string: &str,
813    ) -> anyhow::Result<()> {
814        sqlx::query(
815            r"
816            INSERT INTO token (
817                chain_id, address, error
818            ) VALUES ($1, $2, $3)
819            ON CONFLICT (chain_id, address)
820            DO NOTHING;
821        ",
822        )
823        .bind(chain_id as i32)
824        .bind(address.to_string())
825        .bind(error_string)
826        .execute(&self.pool)
827        .await
828        .map(|_| ())
829        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
830    }
831
832    /// Persists a token swap transaction event to the `pool_swap` table.
833    ///
834    /// # Errors
835    ///
836    /// Returns an error if the database operation fails.
837    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
838        // Extract trade_info fields if available
839        let (order_side, base_quantity, quote_quantity, spot_price, execution_price) =
840            if let Some(ref trade_info) = swap.trade_info {
841                (
842                    Some(trade_info.order_side.to_string()),
843                    Some(trade_info.quantity_base.as_decimal()),
844                    Some(trade_info.quantity_quote.as_decimal()),
845                    Some(trade_info.spot_price.as_decimal()),
846                    Some(trade_info.execution_price.as_decimal()),
847                )
848            } else {
849                (None, None, None, None, None)
850            };
851
852        sqlx::query(
853            r"
854            INSERT INTO pool_swap_event (
855                chain_id, pool_address, block, transaction_hash, transaction_index,
856                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
857                order_side, base_quantity, quote_quantity, spot_price, execution_price
858            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::U160, $10::U128, $11, $12::I256, $13::I256, $14, $15, $16, $17, $18)
859            ON CONFLICT (chain_id, transaction_hash, log_index)
860            DO NOTHING
861        ",
862        )
863        .bind(chain_id as i32)
864        .bind(swap.pool_address.to_string())
865        .bind(swap.block as i64)
866        .bind(swap.transaction_hash.as_str())
867        .bind(swap.transaction_index as i32)
868        .bind(swap.log_index as i32)
869        .bind(swap.sender.to_string())
870        .bind(swap.recipient.to_string())
871        .bind(swap.sqrt_price_x96.to_string())
872        .bind(swap.liquidity.to_string())
873        .bind(swap.tick)
874        .bind(swap.amount0.to_string())
875        .bind(swap.amount1.to_string())
876        .bind(order_side)
877        .bind(base_quantity)
878        .bind(quote_quantity)
879        .bind(spot_price)
880        .bind(execution_price)
881        .execute(&self.pool)
882        .await
883        .map(|_| ())
884        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
885    }
886
887    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the database operation fails.
892    pub async fn add_pool_liquidity_update(
893        &self,
894        chain_id: u32,
895        liquidity_update: &PoolLiquidityUpdate,
896    ) -> anyhow::Result<()> {
897        sqlx::query(
898            r"
899            INSERT INTO pool_liquidity_event (
900                chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
901                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
902            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
903            ON CONFLICT (chain_id, transaction_hash, log_index)
904            DO NOTHING
905        ",
906        )
907        .bind(chain_id as i32)
908        .bind(liquidity_update.pool_address.to_string())
909        .bind(liquidity_update.block as i64)
910        .bind(liquidity_update.transaction_hash.as_str())
911        .bind(liquidity_update.transaction_index as i32)
912        .bind(liquidity_update.log_index as i32)
913        .bind(liquidity_update.kind.to_string())
914        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
915        .bind(liquidity_update.owner.to_string())
916        .bind(U128Pg(liquidity_update.position_liquidity))
917        .bind(U256Pg(liquidity_update.amount0))
918        .bind(U256Pg(liquidity_update.amount1))
919        .bind(liquidity_update.tick_lower)
920        .bind(liquidity_update.tick_upper)
921        .execute(&self.pool)
922        .await
923        .map(|_| ())
924        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
925    }
926
927    /// Retrieves all valid token records for the given chain and converts them into `Token` domain objects.
928    ///
929    /// Only returns tokens that do not contain error information, filtering out invalid tokens
930    /// that were previously recorded with error details.
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if the database query fails.
935    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
936        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
937            .bind(chain.chain_id as i32)
938            .fetch_all(&self.pool)
939            .await
940            .map(|rows| {
941                rows.into_iter()
942                    .map(|token_row| {
943                        Token::new(
944                            chain.clone(),
945                            token_row.address,
946                            token_row.name,
947                            token_row.symbol,
948                            token_row.decimals as u8,
949                        )
950                    })
951                    .collect::<Vec<_>>()
952            })
953            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
954    }
955
956    /// Retrieves all invalid token addresses for a given chain.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if the database query fails or address validation fails.
961    pub async fn load_invalid_token_addresses(
962        &self,
963        chain_id: u32,
964    ) -> anyhow::Result<Vec<Address>> {
965        sqlx::query_as::<_, (String,)>(
966            "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
967        )
968        .bind(chain_id as i32)
969        .fetch_all(&self.pool)
970        .await?
971        .into_iter()
972        .map(|(address,)| validate_address(&address))
973        .collect::<Result<Vec<_>, _>>()
974        .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
975    }
976
977    /// Loads pool data from the database for the specified chain and DEX.
978    ///
979    /// # Errors
980    ///
981    /// Returns an error if the database query fails, the connection to the database is lost, or the query parameters are invalid.
982    pub async fn load_pools(
983        &self,
984        chain: SharedChain,
985        dex_id: &str,
986    ) -> anyhow::Result<Vec<PoolRow>> {
987        sqlx::query_as::<_, PoolRow>(
988            r"
989            SELECT
990                address,
991                dex_name,
992                creation_block,
993                token0_chain,
994                token0_address,
995                token1_chain,
996                token1_address,
997                fee,
998                tick_spacing,
999                initial_tick,
1000                initial_sqrt_price_x96
1001            FROM pool
1002            WHERE chain_id = $1 AND dex_name = $2
1003            ORDER BY creation_block ASC
1004        ",
1005        )
1006        .bind(chain.chain_id as i32)
1007        .bind(dex_id)
1008        .fetch_all(&self.pool)
1009        .await
1010        .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
1011    }
1012
1013    /// Toggles performance optimization settings for sync operations.
1014    ///
1015    /// When enabled (true), applies settings for maximum write performance:
1016    /// - `synchronous_commit` = OFF
1017    /// - `work_mem` increased for bulk operations
1018    ///
1019    /// When disabled (false), restores default safe settings:
1020    /// - `synchronous_commit` = ON (data safety)
1021    /// - `work_mem` back to default
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns an error if the database operations fail.
1026    pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
1027        if enable {
1028            tracing::info!("Enabling performance sync settings for bulk operations");
1029
1030            // Set synchronous_commit to OFF for maximum write performance
1031            sqlx::query("SET synchronous_commit = OFF")
1032                .execute(&self.pool)
1033                .await
1034                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
1035
1036            // Increase work_mem for bulk operations
1037            sqlx::query("SET work_mem = '256MB'")
1038                .execute(&self.pool)
1039                .await
1040                .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
1041
1042            tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
1043        } else {
1044            tracing::info!("Restoring default safe database performance settings");
1045
1046            // Restore synchronous_commit to ON for data safety
1047            sqlx::query("SET synchronous_commit = ON")
1048                .execute(&self.pool)
1049                .await
1050                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
1051
1052            // Reset work_mem to default
1053            sqlx::query("RESET work_mem")
1054                .execute(&self.pool)
1055                .await
1056                .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
1057        }
1058
1059        Ok(())
1060    }
1061
1062    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if the database operation fails.
1067    pub async fn update_dex_last_synced_block(
1068        &self,
1069        chain_id: u32,
1070        dex: &DexType,
1071        block_number: u64,
1072    ) -> anyhow::Result<()> {
1073        sqlx::query(
1074            r"
1075            UPDATE dex
1076            SET last_full_sync_pools_block_number = $3
1077            WHERE chain_id = $1 AND name = $2
1078            ",
1079        )
1080        .bind(chain_id as i32)
1081        .bind(dex.to_string())
1082        .bind(block_number as i64)
1083        .execute(&self.pool)
1084        .await
1085        .map(|_| ())
1086        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1087    }
1088
1089    /// Updates the last synced block number for a pool.
1090    ///
1091    /// # Errors
1092    ///
1093    /// Returns an error if the database update fails.
1094    pub async fn update_pool_last_synced_block(
1095        &self,
1096        chain_id: u32,
1097        dex: &DexType,
1098        pool_address: &Address,
1099        block_number: u64,
1100    ) -> anyhow::Result<()> {
1101        sqlx::query(
1102            r"
1103            UPDATE pool
1104            SET last_full_sync_block_number = $4
1105            WHERE chain_id = $1
1106            AND dex_name = $2
1107            AND address = $3
1108            ",
1109        )
1110        .bind(chain_id as i32)
1111        .bind(dex.to_string())
1112        .bind(pool_address.to_string())
1113        .bind(block_number as i64)
1114        .execute(&self.pool)
1115        .await
1116        .map(|_| ())
1117        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1118    }
1119
1120    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns an error if the database query fails.
1125    pub async fn get_dex_last_synced_block(
1126        &self,
1127        chain_id: u32,
1128        dex: &DexType,
1129    ) -> anyhow::Result<Option<u64>> {
1130        let result = sqlx::query_as::<_, (Option<i64>,)>(
1131            r#"
1132            SELECT
1133                last_full_sync_pools_block_number
1134            FROM dex
1135            WHERE chain_id = $1
1136            AND name = $2
1137            "#,
1138        )
1139        .bind(chain_id as i32)
1140        .bind(dex.to_string())
1141        .fetch_optional(&self.pool)
1142        .await
1143        .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1144
1145        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1146    }
1147
1148    /// Retrieves the last synced block number for a pool.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if the database query fails.
1153    pub async fn get_pool_last_synced_block(
1154        &self,
1155        chain_id: u32,
1156        dex: &DexType,
1157        pool_address: &Address,
1158    ) -> anyhow::Result<Option<u64>> {
1159        let result = sqlx::query_as::<_, (Option<i64>,)>(
1160            r#"
1161            SELECT
1162                last_full_sync_block_number
1163            FROM pool
1164            WHERE chain_id = $1
1165            AND dex_name = $2
1166            AND address = $3
1167            "#,
1168        )
1169        .bind(chain_id as i32)
1170        .bind(dex.to_string())
1171        .bind(pool_address.to_string())
1172        .fetch_optional(&self.pool)
1173        .await
1174        .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1175
1176        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1177    }
1178
1179    /// Retrieves the maximum block number from a specific table for a given pool.
1180    /// This is useful to detect orphaned data where events were inserted but progress wasn't updated.
1181    ///
1182    /// # Errors
1183    ///
1184    /// Returns an error if the database query fails.
1185    pub async fn get_table_last_block(
1186        &self,
1187        chain_id: u32,
1188        table_name: &str,
1189        pool_address: &Address,
1190    ) -> anyhow::Result<Option<u64>> {
1191        let query = format!(
1192            "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1193            table_name
1194        );
1195        let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1196            .bind(chain_id as i32)
1197            .bind(pool_address.to_string())
1198            .fetch_optional(&self.pool)
1199            .await
1200            .map_err(|e| {
1201                anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1202            })?;
1203
1204        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1205    }
1206
1207    /// Adds a batch of pool fee collect events to the database using batch operations.
1208    ///
1209    /// # Errors
1210    ///
1211    /// Returns an error if the database operation fails.
1212    pub async fn add_pool_collects_batch(
1213        &self,
1214        chain_id: u32,
1215        collects: &[PoolFeeCollect],
1216    ) -> anyhow::Result<()> {
1217        if collects.is_empty() {
1218            return Ok(());
1219        }
1220
1221        // Prepare vectors for each column
1222        let len = collects.len();
1223        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1224        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1225        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1226        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1227        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1228        let mut owners: Vec<String> = Vec::with_capacity(len);
1229        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1230        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1231        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1232        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1233        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1234
1235        // Fill vectors from collects
1236        for collect in collects {
1237            chain_ids.push(chain_id as i32);
1238            pool_addresses.push(collect.pool_address.to_string());
1239            blocks.push(collect.block as i64);
1240            transaction_hashes.push(collect.transaction_hash.clone());
1241            transaction_indices.push(collect.transaction_index as i32);
1242            log_indices.push(collect.log_index as i32);
1243            owners.push(collect.owner.to_string());
1244            amount0s.push(collect.amount0.to_string());
1245            amount1s.push(collect.amount1.to_string());
1246            tick_lowers.push(collect.tick_lower);
1247            tick_uppers.push(collect.tick_upper);
1248        }
1249
1250        // Execute batch insert with UNNEST
1251        sqlx::query(
1252            r"
1253            INSERT INTO pool_collect_event (
1254                chain_id, pool_address, block, transaction_hash, transaction_index,
1255                log_index, owner, amount0, amount1, tick_lower, tick_upper
1256            )
1257            SELECT
1258                chain_id, pool_address, block, transaction_hash, transaction_index,
1259                log_index, owner, amount0::U256, amount1::U256, tick_lower, tick_upper
1260            FROM UNNEST(
1261                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1262                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::INT[], $11::INT[]
1263            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1264                   log_index, owner, amount0, amount1, tick_lower, tick_upper)
1265            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1266           ",
1267        )
1268        .bind(&chain_ids[..])
1269        .bind(&pool_addresses[..])
1270        .bind(&blocks[..])
1271        .bind(&transaction_hashes[..])
1272        .bind(&transaction_indices[..])
1273        .bind(&log_indices[..])
1274        .bind(&owners[..])
1275        .bind(&amount0s[..])
1276        .bind(&amount1s[..])
1277        .bind(&tick_lowers[..])
1278        .bind(&tick_uppers[..])
1279        .execute(&self.pool)
1280        .await
1281        .map(|_| ())
1282        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1283    }
1284
1285    /// Inserts multiple pool flash events in a single database operation using UNNEST for optimal performance.
1286    ///
1287    /// # Errors
1288    ///
1289    /// Returns an error if the database operation fails.
1290    pub async fn add_pool_flash_batch(
1291        &self,
1292        chain_id: u32,
1293        flash_events: &[PoolFlash],
1294    ) -> anyhow::Result<()> {
1295        if flash_events.is_empty() {
1296            return Ok(());
1297        }
1298
1299        // Prepare vectors for each column
1300        let len = flash_events.len();
1301        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1302        let mut blocks: Vec<i64> = Vec::with_capacity(len);
1303        let mut transaction_hashes: Vec<String> = Vec::with_capacity(len);
1304        let mut transaction_indices: Vec<i32> = Vec::with_capacity(len);
1305        let mut log_indices: Vec<i32> = Vec::with_capacity(len);
1306        let mut senders: Vec<String> = Vec::with_capacity(len);
1307        let mut recipients: Vec<String> = Vec::with_capacity(len);
1308        let mut amount0s: Vec<String> = Vec::with_capacity(len);
1309        let mut amount1s: Vec<String> = Vec::with_capacity(len);
1310        let mut paid0s: Vec<String> = Vec::with_capacity(len);
1311        let mut paid1s: Vec<String> = Vec::with_capacity(len);
1312        let mut chain_ids: Vec<i32> = Vec::with_capacity(len);
1313
1314        // Fill vectors from flash events
1315        for flash in flash_events {
1316            chain_ids.push(chain_id as i32);
1317            pool_addresses.push(flash.pool_address.to_string());
1318            blocks.push(flash.block as i64);
1319            transaction_hashes.push(flash.transaction_hash.clone());
1320            transaction_indices.push(flash.transaction_index as i32);
1321            log_indices.push(flash.log_index as i32);
1322            senders.push(flash.sender.to_string());
1323            recipients.push(flash.recipient.to_string());
1324            amount0s.push(flash.amount0.to_string());
1325            amount1s.push(flash.amount1.to_string());
1326            paid0s.push(flash.paid0.to_string());
1327            paid1s.push(flash.paid1.to_string());
1328        }
1329
1330        // Execute batch insert with UNNEST
1331        sqlx::query(
1332            r"
1333            INSERT INTO pool_flash_event (
1334                chain_id, pool_address, block, transaction_hash, transaction_index,
1335                log_index, sender, recipient, amount0, amount1, paid0, paid1
1336            )
1337            SELECT
1338                chain_id, pool_address, block, transaction_hash, transaction_index,
1339                log_index, sender, recipient, amount0::U256, amount1::U256, paid0::U256, paid1::U256
1340            FROM UNNEST(
1341                $1::INT[], $2::TEXT[], $3::INT[], $4::TEXT[], $5::INT[],
1342                $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[], $10::TEXT[], $11::TEXT[], $12::TEXT[]
1343            ) AS t(chain_id, pool_address, block, transaction_hash, transaction_index,
1344                   log_index, sender, recipient, amount0, amount1, paid0, paid1)
1345            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1346           ",
1347        )
1348        .bind(&chain_ids[..])
1349        .bind(&pool_addresses[..])
1350        .bind(&blocks[..])
1351        .bind(&transaction_hashes[..])
1352        .bind(&transaction_indices[..])
1353        .bind(&log_indices[..])
1354        .bind(&senders[..])
1355        .bind(&recipients[..])
1356        .bind(&amount0s[..])
1357        .bind(&amount1s[..])
1358        .bind(&paid0s[..])
1359        .bind(&paid1s[..])
1360        .execute(&self.pool)
1361        .await
1362        .map(|_| ())
1363        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_flash_event table: {e}"))
1364    }
1365
1366    /// Adds a pool snapshot to the database.
1367    ///
1368    /// # Errors
1369    ///
1370    /// Returns an error if the database insert fails.
1371    pub async fn add_pool_snapshot(
1372        &self,
1373        chain_id: u32,
1374        pool_address: &Address,
1375        snapshot: &PoolSnapshot,
1376    ) -> anyhow::Result<()> {
1377        sqlx::query(
1378            r"
1379            INSERT INTO pool_snapshot (
1380                chain_id, pool_address, block, transaction_index, log_index, transaction_hash,
1381                current_tick, price_sqrt_ratio_x96, liquidity,
1382                protocol_fees_token0, protocol_fees_token1, fee_protocol,
1383                fee_growth_global_0, fee_growth_global_1,
1384                total_amount0_deposited, total_amount1_deposited,
1385                total_amount0_collected, total_amount1_collected,
1386                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1387                liquidity_utilization_rate
1388            ) VALUES (
1389                $1, $2, $3, $4, $5, $6,
1390                $7, $8::U160, $9::U128, $10::U256, $11::U256, $12,
1391                $13::U256, $14::U256, $15::U256, $16::U256, $17::U256, $18::U256,
1392                $19, $20, $21, $22, $23, $24
1393            )
1394            ON CONFLICT (chain_id, pool_address, block, transaction_index, log_index)
1395            DO NOTHING
1396            ",
1397        )
1398        .bind(chain_id as i32)
1399        .bind(pool_address.to_string())
1400        .bind(snapshot.block_position.number as i64)
1401        .bind(snapshot.block_position.transaction_index as i32)
1402        .bind(snapshot.block_position.log_index as i32)
1403        .bind(snapshot.block_position.transaction_hash.clone())
1404        .bind(snapshot.state.current_tick)
1405        .bind(snapshot.state.price_sqrt_ratio_x96.to_string())
1406        .bind(snapshot.state.liquidity.to_string())
1407        .bind(snapshot.state.protocol_fees_token0.to_string())
1408        .bind(snapshot.state.protocol_fees_token1.to_string())
1409        .bind(snapshot.state.fee_protocol as i16)
1410        .bind(snapshot.state.fee_growth_global_0.to_string())
1411        .bind(snapshot.state.fee_growth_global_1.to_string())
1412        .bind(snapshot.analytics.total_amount0_deposited.to_string())
1413        .bind(snapshot.analytics.total_amount1_deposited.to_string())
1414        .bind(snapshot.analytics.total_amount0_collected.to_string())
1415        .bind(snapshot.analytics.total_amount1_collected.to_string())
1416        .bind(snapshot.analytics.total_swaps as i32)
1417        .bind(snapshot.analytics.total_mints as i32)
1418        .bind(snapshot.analytics.total_burns as i32)
1419        .bind(snapshot.analytics.total_fee_collects as i32)
1420        .bind(snapshot.analytics.total_flashes as i32)
1421        .bind(snapshot.analytics.liquidity_utilization_rate)
1422        .execute(&self.pool)
1423        .await
1424        .map(|_| ())
1425        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_snapshot table: {e}"))
1426    }
1427
1428    /// Inserts multiple pool positions in a single database operation using UNNEST for optimal performance.
1429    ///
1430    /// # Errors
1431    ///
1432    /// Returns an error if the database operation fails.
1433    pub async fn add_pool_positions_batch(
1434        &self,
1435        chain_id: u32,
1436        snapshot_block: u64,
1437        snapshot_transaction_index: u32,
1438        snapshot_log_index: u32,
1439        positions: &[(Address, PoolPosition)],
1440    ) -> anyhow::Result<()> {
1441        if positions.is_empty() {
1442            return Ok(());
1443        }
1444
1445        // Prepare vectors for each column
1446        let len = positions.len();
1447        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1448        let mut owners: Vec<String> = Vec::with_capacity(len);
1449        let mut tick_lowers: Vec<i32> = Vec::with_capacity(len);
1450        let mut tick_uppers: Vec<i32> = Vec::with_capacity(len);
1451        let mut liquidities: Vec<String> = Vec::with_capacity(len);
1452        let mut fee_growth_inside_0_lasts: Vec<String> = Vec::with_capacity(len);
1453        let mut fee_growth_inside_1_lasts: Vec<String> = Vec::with_capacity(len);
1454        let mut tokens_owed_0s: Vec<String> = Vec::with_capacity(len);
1455        let mut tokens_owed_1s: Vec<String> = Vec::with_capacity(len);
1456        let mut total_amount0_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1457        let mut total_amount1_depositeds: Vec<Option<String>> = Vec::with_capacity(len);
1458        let mut total_amount0_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1459        let mut total_amount1_collecteds: Vec<Option<String>> = Vec::with_capacity(len);
1460
1461        // Fill vectors from positions
1462        for (pool_address, position) in positions {
1463            pool_addresses.push(pool_address.to_string());
1464            owners.push(position.owner.to_string());
1465            tick_lowers.push(position.tick_lower);
1466            tick_uppers.push(position.tick_upper);
1467            liquidities.push(position.liquidity.to_string());
1468            fee_growth_inside_0_lasts.push(position.fee_growth_inside_0_last.to_string());
1469            fee_growth_inside_1_lasts.push(position.fee_growth_inside_1_last.to_string());
1470            tokens_owed_0s.push(position.tokens_owed_0.to_string());
1471            tokens_owed_1s.push(position.tokens_owed_1.to_string());
1472            total_amount0_depositeds.push(Some(position.total_amount0_deposited.to_string()));
1473            total_amount1_depositeds.push(Some(position.total_amount1_deposited.to_string()));
1474            total_amount0_collecteds.push(Some(position.total_amount0_collected.to_string()));
1475            total_amount1_collecteds.push(Some(position.total_amount1_collected.to_string()));
1476        }
1477
1478        // Execute batch insert with UNNEST
1479        sqlx::query(
1480            r"
1481            INSERT INTO pool_position (
1482                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1483                owner, tick_lower, tick_upper,
1484                liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1485                tokens_owed_0, tokens_owed_1,
1486                total_amount0_deposited, total_amount1_deposited,
1487                total_amount0_collected, total_amount1_collected
1488            )
1489            SELECT
1490                $1, pool_address, $2, $3, $4,
1491                owner, tick_lower, tick_upper,
1492                liquidity::U128, fee_growth_inside_0_last::U256, fee_growth_inside_1_last::U256,
1493                tokens_owed_0::U128, tokens_owed_1::U128,
1494                total_amount0_deposited::U256, total_amount1_deposited::U256,
1495                total_amount0_collected::U128, total_amount1_collected::U128
1496            FROM UNNEST(
1497                $5::TEXT[], $6::TEXT[], $7::INT[], $8::INT[], $9::TEXT[], $10::TEXT[],
1498                $11::TEXT[], $12::TEXT[], $13::TEXT[], $14::TEXT[], $15::TEXT[],
1499                $16::TEXT[], $17::TEXT[]
1500            ) AS t(pool_address, owner, tick_lower, tick_upper,
1501                   liquidity, fee_growth_inside_0_last, fee_growth_inside_1_last,
1502                   tokens_owed_0, tokens_owed_1,
1503                   total_amount0_deposited, total_amount1_deposited,
1504                   total_amount0_collected, total_amount1_collected)
1505            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, owner, tick_lower, tick_upper)
1506            DO NOTHING
1507           ",
1508        )
1509        .bind(chain_id as i32)
1510        .bind(snapshot_block as i64)
1511        .bind(snapshot_transaction_index as i32)
1512        .bind(snapshot_log_index as i32)
1513        .bind(&pool_addresses[..])
1514        .bind(&owners[..])
1515        .bind(&tick_lowers[..])
1516        .bind(&tick_uppers[..])
1517        .bind(&liquidities[..])
1518        .bind(&fee_growth_inside_0_lasts[..])
1519        .bind(&fee_growth_inside_1_lasts[..])
1520        .bind(&tokens_owed_0s[..])
1521        .bind(&tokens_owed_1s[..])
1522        .bind(&total_amount0_depositeds as &[Option<String>])
1523        .bind(&total_amount1_depositeds as &[Option<String>])
1524        .bind(&total_amount0_collecteds as &[Option<String>])
1525        .bind(&total_amount1_collecteds as &[Option<String>])
1526        .execute(&self.pool)
1527        .await
1528        .map(|_| ())
1529        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_position table: {e}"))
1530    }
1531
1532    /// Inserts multiple pool ticks in a single database operation using UNNEST for optimal performance.
1533    ///
1534    /// # Errors
1535    ///
1536    /// Returns an error if the database operation fails.
1537    pub async fn add_pool_ticks_batch(
1538        &self,
1539        chain_id: u32,
1540        snapshot_block: u64,
1541        snapshot_transaction_index: u32,
1542        snapshot_log_index: u32,
1543        ticks: &[(Address, &PoolTick)],
1544    ) -> anyhow::Result<()> {
1545        if ticks.is_empty() {
1546            return Ok(());
1547        }
1548
1549        // Prepare vectors for each column
1550        let len = ticks.len();
1551        let mut pool_addresses: Vec<String> = Vec::with_capacity(len);
1552        let mut tick_values: Vec<i32> = Vec::with_capacity(len);
1553        let mut liquidity_grosses: Vec<String> = Vec::with_capacity(len);
1554        let mut liquidity_nets: Vec<String> = Vec::with_capacity(len);
1555        let mut fee_growth_outside_0s: Vec<String> = Vec::with_capacity(len);
1556        let mut fee_growth_outside_1s: Vec<String> = Vec::with_capacity(len);
1557        let mut initializeds: Vec<bool> = Vec::with_capacity(len);
1558        let mut last_updated_blocks: Vec<i64> = Vec::with_capacity(len);
1559
1560        // Fill vectors from ticks
1561        for (pool_address, tick) in ticks {
1562            pool_addresses.push(pool_address.to_string());
1563            tick_values.push(tick.value);
1564            liquidity_grosses.push(tick.liquidity_gross.to_string());
1565            liquidity_nets.push(tick.liquidity_net.to_string());
1566            fee_growth_outside_0s.push(tick.fee_growth_outside_0.to_string());
1567            fee_growth_outside_1s.push(tick.fee_growth_outside_1.to_string());
1568            initializeds.push(tick.initialized);
1569            last_updated_blocks.push(tick.last_updated_block as i64);
1570        }
1571
1572        // Execute batch insert with UNNEST
1573        sqlx::query(
1574            r"
1575            INSERT INTO pool_tick (
1576                chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index,
1577                tick_value, liquidity_gross, liquidity_net,
1578                fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block
1579            )
1580            SELECT
1581                $1, pool_address, $2, $3, $4,
1582                tick_value, liquidity_gross::U128, liquidity_net::I128,
1583                fee_growth_outside_0::U256, fee_growth_outside_1::U256, initialized, last_updated_block
1584            FROM UNNEST(
1585                $5::TEXT[], $6::INT[], $7::TEXT[], $8::TEXT[], $9::TEXT[],
1586                $10::TEXT[], $11::BOOLEAN[], $12::BIGINT[]
1587            ) AS t(pool_address, tick_value, liquidity_gross, liquidity_net,
1588                   fee_growth_outside_0, fee_growth_outside_1, initialized, last_updated_block)
1589            ON CONFLICT (chain_id, pool_address, snapshot_block, snapshot_transaction_index, snapshot_log_index, tick_value)
1590            DO NOTHING
1591           ",
1592        )
1593        .bind(chain_id as i32)
1594        .bind(snapshot_block as i64)
1595        .bind(snapshot_transaction_index as i32)
1596        .bind(snapshot_log_index as i32)
1597        .bind(&pool_addresses[..])
1598        .bind(&tick_values[..])
1599        .bind(&liquidity_grosses[..])
1600        .bind(&liquidity_nets[..])
1601        .bind(&fee_growth_outside_0s[..])
1602        .bind(&fee_growth_outside_1s[..])
1603        .bind(&initializeds[..])
1604        .bind(&last_updated_blocks[..])
1605        .execute(&self.pool)
1606        .await
1607        .map(|_| ())
1608        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_tick table: {e}"))
1609    }
1610
1611    /// Updates the initial price and tick for a pool.
1612    ///
1613    /// # Errors
1614    ///
1615    /// Returns an error if the database update fails.
1616    pub async fn update_pool_initial_price_tick(
1617        &self,
1618        chain_id: u32,
1619        initialize_event: &InitializeEvent,
1620    ) -> anyhow::Result<()> {
1621        sqlx::query(
1622            r"
1623            UPDATE pool
1624            SET
1625                initial_tick = $4,
1626                initial_sqrt_price_x96 = $5
1627            WHERE chain_id = $1
1628            AND dex_name = $2
1629            AND address = $3
1630            ",
1631        )
1632        .bind(chain_id as i32)
1633        .bind(initialize_event.dex.name.to_string())
1634        .bind(initialize_event.pool_address.to_string())
1635        .bind(initialize_event.tick)
1636        .bind(initialize_event.sqrt_price_x96.to_string())
1637        .execute(&self.pool)
1638        .await
1639        .map(|_| ())
1640        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1641    }
1642
1643    /// Loads the latest valid pool snapshot from the database.
1644    ///
1645    /// Returns the most recent snapshot that has been validated against on-chain state.
1646    ///
1647    /// # Errors
1648    ///
1649    /// Returns an error if the database query fails.
1650    pub async fn load_latest_valid_pool_snapshot(
1651        &self,
1652        chain_id: u32,
1653        pool_address: &Address,
1654    ) -> anyhow::Result<Option<PoolSnapshot>> {
1655        let result = sqlx::query(
1656            r"
1657            SELECT
1658                block, transaction_index, log_index, transaction_hash,
1659                current_tick, price_sqrt_ratio_x96::TEXT, liquidity::TEXT,
1660                protocol_fees_token0::TEXT, protocol_fees_token1::TEXT, fee_protocol,
1661                fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
1662                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1663                total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1664                total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1665                liquidity_utilization_rate,
1666                (SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
1667            FROM pool_snapshot
1668            WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
1669            ORDER BY block DESC, transaction_index DESC, log_index DESC
1670            LIMIT 1
1671            ",
1672        )
1673        .bind(chain_id as i32)
1674        .bind(pool_address.to_string())
1675        .fetch_optional(&self.pool)
1676        .await
1677        .map_err(|e| anyhow::anyhow!("Failed to load latest valid pool snapshot: {}", e))?;
1678
1679        if let Some(row) = result {
1680            // Parse snapshot state
1681            let block: i64 = row.get("block");
1682            let transaction_index: i32 = row.get("transaction_index");
1683            let log_index: i32 = row.get("log_index");
1684            let transaction_hash: String = row.get("transaction_hash");
1685
1686            let block_position = nautilus_model::defi::data::block::BlockPosition::new(
1687                block as u64,
1688                transaction_hash,
1689                transaction_index as u32,
1690                log_index as u32,
1691            );
1692
1693            let state = PoolState {
1694                current_tick: row.get("current_tick"),
1695                price_sqrt_ratio_x96: row.get::<String, _>("price_sqrt_ratio_x96").parse()?,
1696                liquidity: row.get::<String, _>("liquidity").parse()?,
1697                protocol_fees_token0: row.get::<String, _>("protocol_fees_token0").parse()?,
1698                protocol_fees_token1: row.get::<String, _>("protocol_fees_token1").parse()?,
1699                fee_protocol: row.get::<i16, _>("fee_protocol") as u8,
1700                fee_growth_global_0: row.get::<String, _>("fee_growth_global_0").parse()?,
1701                fee_growth_global_1: row.get::<String, _>("fee_growth_global_1").parse()?,
1702            };
1703
1704            let analytics = PoolAnalytics {
1705                total_amount0_deposited: row.get::<String, _>("total_amount0_deposited").parse()?,
1706                total_amount1_deposited: row.get::<String, _>("total_amount1_deposited").parse()?,
1707                total_amount0_collected: row.get::<String, _>("total_amount0_collected").parse()?,
1708                total_amount1_collected: row.get::<String, _>("total_amount1_collected").parse()?,
1709                total_swaps: row.get::<i32, _>("total_swaps") as u64,
1710                total_mints: row.get::<i32, _>("total_mints") as u64,
1711                total_burns: row.get::<i32, _>("total_burns") as u64,
1712                total_fee_collects: row.get::<i32, _>("total_fee_collects") as u64,
1713                total_flashes: row.get::<i32, _>("total_flashes") as u64,
1714                liquidity_utilization_rate: row.get::<f64, _>("liquidity_utilization_rate"),
1715            };
1716
1717            // Load positions and ticks
1718            let positions = self
1719                .load_pool_positions_for_snapshot(
1720                    chain_id,
1721                    pool_address,
1722                    block as u64,
1723                    transaction_index as u32,
1724                    log_index as u32,
1725                )
1726                .await?;
1727
1728            let ticks = self
1729                .load_pool_ticks_for_snapshot(
1730                    chain_id,
1731                    pool_address,
1732                    block as u64,
1733                    transaction_index as u32,
1734                    log_index as u32,
1735                )
1736                .await?;
1737
1738            let dex_name: String = row.get("dex_name");
1739            let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1740                .ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1741
1742            let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1743                .ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1744
1745            let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1746                .ok_or_else(|| {
1747                    anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1748                })?;
1749
1750            let instrument_id =
1751                Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1752
1753            Ok(Some(PoolSnapshot::new(
1754                instrument_id,
1755                state,
1756                positions,
1757                ticks,
1758                analytics,
1759                block_position,
1760            )))
1761        } else {
1762            Ok(None)
1763        }
1764    }
1765
1766    /// Marks a pool snapshot as valid after successful on-chain verification.
1767    ///
1768    /// # Errors
1769    ///
1770    /// Returns an error if the database operation fails.
1771    pub async fn mark_pool_snapshot_valid(
1772        &self,
1773        chain_id: u32,
1774        pool_address: &Address,
1775        block: u64,
1776        transaction_index: u32,
1777        log_index: u32,
1778    ) -> anyhow::Result<()> {
1779        sqlx::query(
1780            r"
1781            UPDATE pool_snapshot
1782            SET is_valid = TRUE
1783            WHERE chain_id = $1
1784            AND pool_address = $2
1785            AND block = $3
1786            AND transaction_index = $4
1787            AND log_index = $5
1788            ",
1789        )
1790        .bind(chain_id as i32)
1791        .bind(pool_address.to_string())
1792        .bind(block as i64)
1793        .bind(transaction_index as i32)
1794        .bind(log_index as i32)
1795        .execute(&self.pool)
1796        .await
1797        .map(|_| ())
1798        .map_err(|e| anyhow::anyhow!("Failed to mark pool snapshot as valid: {}", e))
1799    }
1800
1801    /// Loads all positions for a specific snapshot.
1802    ///
1803    /// # Errors
1804    ///
1805    /// Returns an error if the database query fails.
1806    pub async fn load_pool_positions_for_snapshot(
1807        &self,
1808        chain_id: u32,
1809        pool_address: &Address,
1810        snapshot_block: u64,
1811        snapshot_transaction_index: u32,
1812        snapshot_log_index: u32,
1813    ) -> anyhow::Result<Vec<PoolPosition>> {
1814        let rows = sqlx::query(
1815            r"
1816            SELECT
1817                owner, tick_lower, tick_upper,
1818                liquidity::TEXT, fee_growth_inside_0_last::TEXT, fee_growth_inside_1_last::TEXT,
1819                tokens_owed_0::TEXT, tokens_owed_1::TEXT,
1820                total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
1821                total_amount0_collected::TEXT, total_amount1_collected::TEXT
1822            FROM pool_position
1823            WHERE chain_id = $1
1824            AND pool_address = $2
1825            AND snapshot_block = $3
1826            AND snapshot_transaction_index = $4
1827            AND snapshot_log_index = $5
1828            ",
1829        )
1830        .bind(chain_id as i32)
1831        .bind(pool_address.to_string())
1832        .bind(snapshot_block as i64)
1833        .bind(snapshot_transaction_index as i32)
1834        .bind(snapshot_log_index as i32)
1835        .fetch_all(&self.pool)
1836        .await
1837        .map_err(|e| anyhow::anyhow!("Failed to load pool positions: {}", e))?;
1838
1839        rows.iter()
1840            .map(|row| {
1841                let owner: String = row.get("owner");
1842                let position = PoolPosition {
1843                    owner: validate_address(&owner)?,
1844                    tick_lower: row.get("tick_lower"),
1845                    tick_upper: row.get("tick_upper"),
1846                    liquidity: row.get::<String, _>("liquidity").parse()?,
1847                    fee_growth_inside_0_last: row
1848                        .get::<String, _>("fee_growth_inside_0_last")
1849                        .parse()?,
1850                    fee_growth_inside_1_last: row
1851                        .get::<String, _>("fee_growth_inside_1_last")
1852                        .parse()?,
1853                    tokens_owed_0: row.get::<String, _>("tokens_owed_0").parse()?,
1854                    tokens_owed_1: row.get::<String, _>("tokens_owed_1").parse()?,
1855                    total_amount0_deposited: row
1856                        .get::<String, _>("total_amount0_deposited")
1857                        .parse()?,
1858                    total_amount1_deposited: row
1859                        .get::<String, _>("total_amount1_deposited")
1860                        .parse()?,
1861                    total_amount0_collected: row
1862                        .get::<String, _>("total_amount0_collected")
1863                        .parse()?,
1864                    total_amount1_collected: row
1865                        .get::<String, _>("total_amount1_collected")
1866                        .parse()?,
1867                };
1868                Ok(position)
1869            })
1870            .collect()
1871    }
1872
1873    /// Loads all ticks for a specific snapshot.
1874    ///
1875    /// # Errors
1876    ///
1877    /// Returns an error if the database query fails.
1878    pub async fn load_pool_ticks_for_snapshot(
1879        &self,
1880        chain_id: u32,
1881        pool_address: &Address,
1882        snapshot_block: u64,
1883        snapshot_transaction_index: u32,
1884        snapshot_log_index: u32,
1885    ) -> anyhow::Result<Vec<PoolTick>> {
1886        let rows = sqlx::query(
1887            r"
1888            SELECT
1889                tick_value, liquidity_gross::TEXT, liquidity_net::TEXT,
1890                fee_growth_outside_0::TEXT, fee_growth_outside_1::TEXT, initialized,
1891                last_updated_block
1892            FROM pool_tick
1893            WHERE chain_id = $1
1894            AND pool_address = $2
1895            AND snapshot_block = $3
1896            AND snapshot_transaction_index = $4
1897            AND snapshot_log_index = $5
1898            ",
1899        )
1900        .bind(chain_id as i32)
1901        .bind(pool_address.to_string())
1902        .bind(snapshot_block as i64)
1903        .bind(snapshot_transaction_index as i32)
1904        .bind(snapshot_log_index as i32)
1905        .fetch_all(&self.pool)
1906        .await
1907        .map_err(|e| anyhow::anyhow!("Failed to load pool ticks: {}", e))?;
1908
1909        rows.iter()
1910            .map(|row| {
1911                let tick = PoolTick::new(
1912                    row.get("tick_value"),
1913                    row.get::<String, _>("liquidity_gross").parse()?,
1914                    row.get::<String, _>("liquidity_net").parse()?,
1915                    row.get::<String, _>("fee_growth_outside_0").parse()?,
1916                    row.get::<String, _>("fee_growth_outside_1").parse()?,
1917                    row.get("initialized"),
1918                    row.get::<i64, _>("last_updated_block") as u64,
1919                );
1920                Ok(tick)
1921            })
1922            .collect()
1923    }
1924
1925    /// Streams pool events from all event tables (swap, liquidity, collect) for a specific pool.
1926    ///
1927    /// Creates a unified stream of pool events from multiple tables, ordering them chronologically
1928    /// by block number, transaction index, and log index. Optionally resumes from a specific block position.
1929    ///
1930    /// # Returns
1931    ///
1932    /// A stream of `DexPoolData` events in chronological order.
1933    ///
1934    /// # Errors
1935    ///
1936    /// Returns an error if the database query fails or if event transformation fails.
1937    pub fn stream_pool_events<'a>(
1938        &'a self,
1939        chain: SharedChain,
1940        dex: SharedDex,
1941        instrument_id: InstrumentId,
1942        pool_address: &Address,
1943        from_position: Option<BlockPosition>,
1944    ) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
1945        // Query without position filter (streams all events)
1946        const QUERY_ALL: &str = r"
1947            (SELECT
1948                'swap' as event_type,
1949                chain_id,
1950                pool_address,
1951                block,
1952                transaction_hash,
1953                transaction_index,
1954                log_index,
1955                sender,
1956                recipient,
1957                NULL::TEXT as owner,
1958                sqrt_price_x96::TEXT,
1959                liquidity::TEXT as swap_liquidity,
1960                tick as swap_tick,
1961                amount0::TEXT as swap_amount0,
1962                amount1::TEXT as swap_amount1,
1963                NULL::TEXT as position_liquidity,
1964                NULL::TEXT as amount0,
1965                NULL::TEXT as amount1,
1966                NULL::INT as tick_lower,
1967                NULL::INT as tick_upper,
1968                NULL::TEXT as liquidity_event_type,
1969                NULL::TEXT as flash_amount0,
1970                NULL::TEXT as flash_amount1,
1971                NULL::TEXT as flash_paid0,
1972                NULL::TEXT as flash_paid1
1973            FROM pool_swap_event
1974            WHERE chain_id = $1 AND pool_address = $2)
1975            UNION ALL
1976            (SELECT
1977                'liquidity' as event_type,
1978                chain_id,
1979                pool_address,
1980                block,
1981                transaction_hash,
1982                transaction_index,
1983                log_index,
1984                sender,
1985                NULL::TEXT as recipient,
1986                owner,
1987                NULL::text as sqrt_price_x96,
1988                NULL::TEXT as swap_liquidity,
1989                NULL::INT as swap_tick,
1990                amount0::TEXT as swap_amount0,
1991                amount1::TEXT as swap_amount1,
1992                position_liquidity::TEXT,
1993                amount0::TEXT,
1994                amount1::TEXT,
1995                tick_lower::INT,
1996                tick_upper::INT,
1997                event_type as liquidity_event_type,
1998                NULL::TEXT as flash_amount0,
1999                NULL::TEXT as flash_amount1,
2000                NULL::TEXT as flash_paid0,
2001                NULL::TEXT as flash_paid1
2002            FROM pool_liquidity_event
2003            WHERE chain_id = $1 AND pool_address = $2)
2004            UNION ALL
2005            (SELECT
2006                'collect' as event_type,
2007                chain_id,
2008                pool_address,
2009                block,
2010                transaction_hash,
2011                transaction_index,
2012                log_index,
2013                NULL::TEXT as sender,
2014                NULL::TEXT as recipient,
2015                owner,
2016                NULL::TEXT as sqrt_price_x96,
2017                NULL::TEXT as swap_liquidity,
2018                NULL::INT AS swap_tick,
2019                amount0::TEXT as swap_amount0,
2020                amount1::TEXT as swap_amount1,
2021                NULL::TEXT as position_liquidity,
2022                amount0::TEXT,
2023                amount1::TEXT,
2024                tick_lower::INT,
2025                tick_upper::INT,
2026                NULL::TEXT as liquidity_event_type,
2027                NULL::TEXT as flash_amount0,
2028                NULL::TEXT as flash_amount1,
2029                NULL::TEXT as flash_paid0,
2030                NULL::TEXT as flash_paid1
2031            FROM pool_collect_event
2032            WHERE chain_id = $1 AND pool_address = $2)
2033            UNION ALL
2034            (SELECT
2035                'flash' as event_type,
2036                chain_id,
2037                pool_address,
2038                block,
2039                transaction_hash,
2040                transaction_index,
2041                log_index,
2042                sender,
2043                recipient,
2044                NULL::TEXT as owner,
2045                NULL::TEXT as sqrt_price_x96,
2046                NULL::TEXT as swap_liquidity,
2047                NULL::INT AS swap_tick,
2048                NULL::TEXT as swap_amount0,
2049                NULL::TEXT as swap_amount1,
2050                NULL::TEXT as position_liquidity,
2051                NULL::TEXT as amount0,
2052                NULL::TEXT as amount1,
2053                NULL::INT as tick_lower,
2054                NULL::INT as tick_upper,
2055                NULL::TEXT as liquidity_event_type,
2056                amount0::TEXT as flash_amount0,
2057                amount1::TEXT as flash_amount1,
2058                paid0::TEXT as flash_paid0,
2059                paid1::TEXT as flash_paid1
2060            FROM pool_flash_event
2061            WHERE chain_id = $1 AND pool_address = $2)
2062            ORDER BY block, transaction_index, log_index";
2063
2064        // Query with position filter (resumes from specific block position)
2065        const QUERY_FROM_POSITION: &str = r"
2066            (SELECT
2067                'swap' as event_type,
2068                chain_id,
2069                pool_address,
2070                block,
2071                transaction_hash,
2072                transaction_index,
2073                log_index,
2074                sender,
2075                recipient,
2076                NULL::TEXT as owner,
2077                sqrt_price_x96::TEXT,
2078                liquidity::TEXT as swap_liquidity,
2079                tick as swap_tick,
2080                amount0::TEXT as swap_amount0,
2081                amount1::TEXT as swap_amount1,
2082                NULL::TEXT as position_liquidity,
2083                NULL::TEXT as amount0,
2084                NULL::TEXT as amount1,
2085                NULL::INT as tick_lower,
2086                NULL::INT as tick_upper,
2087                NULL::TEXT as liquidity_event_type,
2088                NULL::TEXT as flash_amount0,
2089                NULL::TEXT as flash_amount1,
2090                NULL::TEXT as flash_paid0,
2091                NULL::TEXT as flash_paid1
2092            FROM pool_swap_event
2093            WHERE chain_id = $1 AND pool_address = $2
2094            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2095            UNION ALL
2096            (SELECT
2097                'liquidity' as event_type,
2098                chain_id,
2099                pool_address,
2100                block,
2101                transaction_hash,
2102                transaction_index,
2103                log_index,
2104                sender,
2105                NULL::TEXT as recipient,
2106                owner,
2107                NULL::text as sqrt_price_x96,
2108                NULL::TEXT as swap_liquidity,
2109                NULL::INT as swap_tick,
2110                amount0::TEXT as swap_amount0,
2111                amount1::TEXT as swap_amount1,
2112                position_liquidity::TEXT,
2113                amount0::TEXT,
2114                amount1::TEXT,
2115                tick_lower::INT,
2116                tick_upper::INT,
2117                event_type as liquidity_event_type,
2118                NULL::TEXT as flash_amount0,
2119                NULL::TEXT as flash_amount1,
2120                NULL::TEXT as flash_paid0,
2121                NULL::TEXT as flash_paid1
2122            FROM pool_liquidity_event
2123            WHERE chain_id = $1 AND pool_address = $2
2124            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2125            UNION ALL
2126            (SELECT
2127                'collect' as event_type,
2128                chain_id,
2129                pool_address,
2130                block,
2131                transaction_hash,
2132                transaction_index,
2133                log_index,
2134                NULL::TEXT as sender,
2135                NULL::TEXT as recipient,
2136                owner,
2137                NULL::TEXT as sqrt_price_x96,
2138                NULL::TEXT as swap_liquidity,
2139                NULL::INT AS swap_tick,
2140                amount0::TEXT as swap_amount0,
2141                amount1::TEXT as swap_amount1,
2142                NULL::TEXT as position_liquidity,
2143                amount0::TEXT,
2144                amount1::TEXT,
2145                tick_lower::INT,
2146                tick_upper::INT,
2147                NULL::TEXT as liquidity_event_type,
2148                NULL::TEXT as flash_amount0,
2149                NULL::TEXT as flash_amount1,
2150                NULL::TEXT as flash_paid0,
2151                NULL::TEXT as flash_paid1
2152            FROM pool_collect_event
2153            WHERE chain_id = $1 AND pool_address = $2
2154            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2155            UNION ALL
2156            (SELECT
2157                'flash' as event_type,
2158                chain_id,
2159                pool_address,
2160                block,
2161                transaction_hash,
2162                transaction_index,
2163                log_index,
2164                sender,
2165                recipient,
2166                NULL::TEXT as owner,
2167                NULL::TEXT as sqrt_price_x96,
2168                NULL::TEXT as swap_liquidity,
2169                NULL::INT AS swap_tick,
2170                NULL::TEXT as swap_amount0,
2171                NULL::TEXT as swap_amount1,
2172                NULL::TEXT as position_liquidity,
2173                NULL::TEXT as amount0,
2174                NULL::TEXT as amount1,
2175                NULL::INT as tick_lower,
2176                NULL::INT as tick_upper,
2177                NULL::TEXT as liquidity_event_type,
2178                amount0::TEXT as flash_amount0,
2179                amount1::TEXT as flash_amount1,
2180                paid0::TEXT as flash_paid0,
2181                paid1::TEXT as flash_paid1
2182            FROM pool_flash_event
2183            WHERE chain_id = $1 AND pool_address = $2
2184            AND (block > $3 OR (block = $3 AND transaction_index > $4) OR (block = $3 AND transaction_index = $4 AND log_index > $5)))
2185            ORDER BY block, transaction_index, log_index";
2186
2187        // Build query with appropriate bindings
2188        let query = if let Some(pos) = from_position {
2189            sqlx::query(QUERY_FROM_POSITION)
2190                .bind(chain.chain_id as i32)
2191                .bind(pool_address.to_string())
2192                .bind(pos.number as i64)
2193                .bind(pos.transaction_index as i32)
2194                .bind(pos.log_index as i32)
2195                .fetch(&self.pool)
2196        } else {
2197            sqlx::query(QUERY_ALL)
2198                .bind(chain.chain_id as i32)
2199                .bind(pool_address.to_string())
2200                .fetch(&self.pool)
2201        };
2202
2203        // Transform rows to events
2204        let stream = query.map(move |row_result| match row_result {
2205            Ok(row) => {
2206                transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2207                    .map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2208            }
2209            Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
2210        });
2211
2212        Box::pin(stream)
2213    }
2214}