nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use alloy::primitives::{Address, U256};
17use nautilus_model::defi::{
18    Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
19    data::PoolFeeCollect, validation::validate_address,
20};
21use sqlx::{PgPool, postgres::PgConnectOptions};
22
23use crate::{
24    cache::{
25        consistency::CachedBlocksConsistencyStatus,
26        copy::PostgresCopyHandler,
27        rows::{BlockTimestampRow, PoolRow, TokenRow},
28    },
29    events::initialize::InitializeEvent,
30};
31
32/// Database interface for persisting and retrieving blockchain entities and domain objects.
33#[derive(Debug)]
34pub struct BlockchainCacheDatabase {
35    /// PostgreSQL connection pool used for database operations.
36    pool: PgPool,
37}
38
39impl BlockchainCacheDatabase {
40    /// Initializes a new database instance by establishing a connection to PostgreSQL.
41    ///
42    /// # Panics
43    ///
44    /// Panics if unable to connect to PostgreSQL with the provided options.
45    pub async fn init(pg_options: PgConnectOptions) -> Self {
46        let pool = sqlx::postgres::PgPoolOptions::new()
47            .max_connections(32) // Increased from default 10
48            .min_connections(5) // Keep some connections warm
49            .acquire_timeout(std::time::Duration::from_secs(3))
50            .connect_with(pg_options)
51            .await
52            .expect("Error connecting to Postgres");
53        Self { pool }
54    }
55
56    /// Seeds the database with a blockchain chain record.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the database operation fails.
61    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
62        sqlx::query(
63            r"
64            INSERT INTO chain (
65                chain_id, name
66            ) VALUES ($1,$2)
67            ON CONFLICT (chain_id)
68            DO NOTHING
69        ",
70        )
71        .bind(chain.chain_id as i32)
72        .bind(chain.name.to_string())
73        .execute(&self.pool)
74        .await
75        .map(|_| ())
76        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
77    }
78
79    /// Creates a table partition for the block table specific to the given chain
80    /// by calling the existing PostgreSQL function `create_block_partition`.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the database operation fails.
85    pub async fn create_block_partition(&self, chain: &Chain) -> anyhow::Result<String> {
86        let result: (String,) = sqlx::query_as("SELECT create_block_partition($1)")
87            .bind(chain.chain_id as i32)
88            .fetch_one(&self.pool)
89            .await
90            .map_err(|e| {
91                anyhow::anyhow!(
92                    "Failed to call create_block_partition for chain {}: {e}",
93                    chain.chain_id
94                )
95            })?;
96
97        Ok(result.0)
98    }
99
100    /// Creates a table partition for the token table specific to the given chain
101    /// by calling the existing PostgreSQL function `create_token_partition`.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the database operation fails.
106    pub async fn create_token_partition(&self, chain: &Chain) -> anyhow::Result<String> {
107        let result: (String,) = sqlx::query_as("SELECT create_token_partition($1)")
108            .bind(chain.chain_id as i32)
109            .fetch_one(&self.pool)
110            .await
111            .map_err(|e| {
112                anyhow::anyhow!(
113                    "Failed to call create_token_partition for chain {}: {e}",
114                    chain.chain_id
115                )
116            })?;
117
118        Ok(result.0)
119    }
120
121    /// Returns the highest block number that maintains data continuity in the database.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the database query fails.
126    pub async fn get_block_consistency_status(
127        &self,
128        chain: &Chain,
129    ) -> anyhow::Result<CachedBlocksConsistencyStatus> {
130        tracing::info!("Fetching block consistency status");
131
132        let result: (i64, i64) = sqlx::query_as(
133            r"
134            SELECT
135                COALESCE((SELECT number FROM block WHERE chain_id = $1 ORDER BY number DESC LIMIT 1), 0) as max_block,
136                get_last_continuous_block($1) as last_continuous_block
137            "
138        )
139        .bind(chain.chain_id as i32)
140        .fetch_one(&self.pool)
141        .await
142        .map_err(|e| {
143            anyhow::anyhow!(
144                "Failed to get block info for chain {}: {}",
145                chain.chain_id,
146                e
147            )
148        })?;
149
150        Ok(CachedBlocksConsistencyStatus::new(
151            result.0 as u64,
152            result.1 as u64,
153        ))
154    }
155
156    /// Inserts or updates a block record in the database.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the database operation fails.
161    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
162        sqlx::query(
163            r"
164            INSERT INTO block (
165                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
166                base_fee_per_gas, blob_gas_used, excess_blob_gas,
167                l1_gas_price, l1_gas_used, l1_fee_scalar
168            ) VALUES (
169                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
170            )
171            ON CONFLICT (chain_id, number)
172            DO UPDATE
173            SET
174                hash = $3,
175                parent_hash = $4,
176                miner = $5,
177                gas_limit = $6,
178                gas_used = $7,
179                timestamp = $8,
180                base_fee_per_gas = $9,
181                blob_gas_used = $10,
182                excess_blob_gas = $11,
183                l1_gas_price = $12,
184                l1_gas_used = $13,
185                l1_fee_scalar = $14
186        ",
187        )
188        .bind(chain_id as i32)
189        .bind(block.number as i64)
190        .bind(block.hash.as_str())
191        .bind(block.parent_hash.as_str())
192        .bind(block.miner.as_str())
193        .bind(block.gas_limit as i64)
194        .bind(block.gas_used as i64)
195        .bind(block.timestamp.to_string())
196        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
197        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
198        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
199        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
200        .bind(block.l1_gas_used.map(|v| v as i64))
201        .bind(block.l1_fee_scalar.map(|v| v as i64))
202        .execute(&self.pool)
203        .await
204        .map(|_| ())
205        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
206    }
207
208    /// Inserts multiple blocks in a single database operation using UNNEST for optimal performance.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the database operation fails.
213    pub async fn add_blocks_batch(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
214        if blocks.is_empty() {
215            return Ok(());
216        }
217
218        // Prepare vectors for each column
219        let mut numbers: Vec<i64> = Vec::with_capacity(blocks.len());
220        let mut hashes: Vec<String> = Vec::with_capacity(blocks.len());
221        let mut parent_hashes: Vec<String> = Vec::with_capacity(blocks.len());
222        let mut miners: Vec<String> = Vec::with_capacity(blocks.len());
223        let mut gas_limits: Vec<i64> = Vec::with_capacity(blocks.len());
224        let mut gas_useds: Vec<i64> = Vec::with_capacity(blocks.len());
225        let mut timestamps: Vec<String> = Vec::with_capacity(blocks.len());
226        let mut base_fee_per_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
227        let mut blob_gas_useds: Vec<Option<String>> = Vec::with_capacity(blocks.len());
228        let mut excess_blob_gases: Vec<Option<String>> = Vec::with_capacity(blocks.len());
229        let mut l1_gas_prices: Vec<Option<String>> = Vec::with_capacity(blocks.len());
230        let mut l1_gas_useds: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
231        let mut l1_fee_scalars: Vec<Option<i64>> = Vec::with_capacity(blocks.len());
232
233        // Fill vectors from blocks
234        for block in blocks {
235            numbers.push(block.number as i64);
236            hashes.push(block.hash.clone());
237            parent_hashes.push(block.parent_hash.clone());
238            miners.push(block.miner.to_string());
239            gas_limits.push(block.gas_limit as i64);
240            gas_useds.push(block.gas_used as i64);
241            timestamps.push(block.timestamp.to_string());
242            base_fee_per_gases.push(block.base_fee_per_gas.as_ref().map(U256::to_string));
243            blob_gas_useds.push(block.blob_gas_used.as_ref().map(U256::to_string));
244            excess_blob_gases.push(block.excess_blob_gas.as_ref().map(U256::to_string));
245            l1_gas_prices.push(block.l1_gas_price.as_ref().map(U256::to_string));
246            l1_gas_useds.push(block.l1_gas_used.map(|v| v as i64));
247            l1_fee_scalars.push(block.l1_fee_scalar.map(|v| v as i64));
248        }
249
250        // Execute batch insert with UNNEST
251        sqlx::query(
252            r"
253            INSERT INTO block (
254                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
255                base_fee_per_gas, blob_gas_used, excess_blob_gas,
256                l1_gas_price, l1_gas_used, l1_fee_scalar
257            )
258            SELECT
259                $1, *
260            FROM UNNEST(
261                $2::int8[], $3::text[], $4::text[], $5::text[],
262                $6::int8[], $7::int8[], $8::text[],
263                $9::text[], $10::text[], $11::text[],
264                $12::text[], $13::int8[], $14::int8[]
265            )
266            ON CONFLICT (chain_id, number) DO NOTHING
267           ",
268        )
269        .bind(chain_id as i32)
270        .bind(&numbers[..])
271        .bind(&hashes[..])
272        .bind(&parent_hashes[..])
273        .bind(&miners[..])
274        .bind(&gas_limits[..])
275        .bind(&gas_useds[..])
276        .bind(&timestamps[..])
277        .bind(&base_fee_per_gases as &[Option<String>])
278        .bind(&blob_gas_useds as &[Option<String>])
279        .bind(&excess_blob_gases as &[Option<String>])
280        .bind(&l1_gas_prices as &[Option<String>])
281        .bind(&l1_gas_useds as &[Option<i64>])
282        .bind(&l1_fee_scalars as &[Option<i64>])
283        .execute(&self.pool)
284        .await
285        .map(|_| ())
286        .map_err(|e| anyhow::anyhow!("Failed to batch insert into block table: {e}"))
287    }
288
289    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
290    ///
291    /// This method is significantly faster than INSERT for bulk operations as it bypasses
292    /// SQL parsing and uses PostgreSQL's native binary protocol.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the COPY operation fails.
297    pub async fn add_blocks_copy(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
298        let copy_handler = PostgresCopyHandler::new(&self.pool);
299        copy_handler.copy_blocks(chain_id, blocks).await
300    }
301
302    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
303    ///
304    /// This method is significantly faster than INSERT for bulk operations as it bypasses
305    /// SQL parsing and uses PostgreSQL's native binary protocol.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the COPY operation fails.
310    pub async fn add_pool_swaps_copy(
311        &self,
312        chain_id: u32,
313        swaps: &[PoolSwap],
314    ) -> anyhow::Result<()> {
315        let copy_handler = PostgresCopyHandler::new(&self.pool);
316        copy_handler.copy_pool_swaps(chain_id, swaps).await
317    }
318
319    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
320    ///
321    /// This method is significantly faster than INSERT for bulk operations as it bypasses
322    /// SQL parsing and uses PostgreSQL's native binary protocol.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the COPY operation fails.
327    pub async fn add_pool_liquidity_updates_copy(
328        &self,
329        chain_id: u32,
330        updates: &[PoolLiquidityUpdate],
331    ) -> anyhow::Result<()> {
332        let copy_handler = PostgresCopyHandler::new(&self.pool);
333        copy_handler
334            .copy_pool_liquidity_updates(chain_id, updates)
335            .await
336    }
337
338    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
339    ///
340    /// This method is significantly faster than INSERT for bulk operations as it bypasses
341    /// SQL parsing and most database validation checks.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the COPY operation fails.
346    pub async fn copy_pool_fee_collects_batch(
347        &self,
348        chain_id: u32,
349        collects: &[PoolFeeCollect],
350    ) -> anyhow::Result<()> {
351        let copy_handler = PostgresCopyHandler::new(&self.pool);
352        copy_handler.copy_pool_collects(chain_id, collects).await
353    }
354
355    /// Retrieves block timestamps for a given chain starting from a specific block number.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the database query fails.
360    pub async fn load_block_timestamps(
361        &self,
362        chain: SharedChain,
363        from_block: u64,
364    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
365        sqlx::query_as::<_, BlockTimestampRow>(
366            r"
367            SELECT
368                number,
369                timestamp
370            FROM block
371            WHERE chain_id = $1 AND number >= $2
372            ORDER BY number ASC
373            ",
374        )
375        .bind(chain.chain_id as i32)
376        .bind(from_block as i64)
377        .fetch_all(&self.pool)
378        .await
379        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
380    }
381
382    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the database operation fails.
387    pub async fn add_dex(&self, dex: SharedDex) -> anyhow::Result<()> {
388        sqlx::query(
389            r"
390            INSERT INTO dex (
391                chain_id, name, factory_address, creation_block
392            ) VALUES ($1, $2, $3, $4)
393            ON CONFLICT (chain_id, name)
394            DO UPDATE
395            SET
396                factory_address = $3,
397                creation_block = $4
398        ",
399        )
400        .bind(dex.chain.chain_id as i32)
401        .bind(dex.name.to_string())
402        .bind(dex.factory.to_string())
403        .bind(dex.factory_creation_block as i64)
404        .execute(&self.pool)
405        .await
406        .map(|_| ())
407        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
408    }
409
410    /// Adds or updates a liquidity pool/pair record in the database.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the database operation fails.
415    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
416        sqlx::query(
417            r"
418            INSERT INTO pool (
419                chain_id, address, dex_name, creation_block,
420                token0_chain, token0_address,
421                token1_chain, token1_address,
422                fee, tick_spacing
423            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
424            ON CONFLICT (chain_id, address)
425            DO UPDATE
426            SET
427                dex_name = $3,
428                creation_block = $4,
429                token0_chain = $5,
430                token0_address = $6,
431                token1_chain = $7,
432                token1_address = $8,
433                fee = $9,
434                tick_spacing = $10
435        ",
436        )
437        .bind(pool.chain.chain_id as i32)
438        .bind(pool.address.to_string())
439        .bind(pool.dex.name.to_string())
440        .bind(pool.creation_block as i64)
441        .bind(pool.token0.chain.chain_id as i32)
442        .bind(pool.token0.address.to_string())
443        .bind(pool.token1.chain.chain_id as i32)
444        .bind(pool.token1.address.to_string())
445        .bind(pool.fee.map(|fee| fee as i32))
446        .bind(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32))
447        .execute(&self.pool)
448        .await
449        .map(|_| ())
450        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
451    }
452
453    /// Inserts multiple pools in a single database operation using UNNEST for optimal performance.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the database operation fails.
458    pub async fn add_pools_batch(&self, pools: &[Pool]) -> anyhow::Result<()> {
459        if pools.is_empty() {
460            return Ok(());
461        }
462
463        // Prepare vectors for each column
464        let mut addresses: Vec<String> = Vec::with_capacity(pools.len());
465        let mut dex_names: Vec<String> = Vec::with_capacity(pools.len());
466        let mut creation_blocks: Vec<i64> = Vec::with_capacity(pools.len());
467        let mut token0_chains: Vec<i32> = Vec::with_capacity(pools.len());
468        let mut token0_addresses: Vec<String> = Vec::with_capacity(pools.len());
469        let mut token1_chains: Vec<i32> = Vec::with_capacity(pools.len());
470        let mut token1_addresses: Vec<String> = Vec::with_capacity(pools.len());
471        let mut fees: Vec<Option<i32>> = Vec::with_capacity(pools.len());
472        let mut tick_spacings: Vec<Option<i32>> = Vec::with_capacity(pools.len());
473        let mut chain_ids: Vec<i32> = Vec::with_capacity(pools.len());
474
475        // Fill vectors from pools
476        for pool in pools {
477            chain_ids.push(pool.chain.chain_id as i32);
478            addresses.push(pool.address.to_string());
479            dex_names.push(pool.dex.name.to_string());
480            creation_blocks.push(pool.creation_block as i64);
481            token0_chains.push(pool.token0.chain.chain_id as i32);
482            token0_addresses.push(pool.token0.address.to_string());
483            token1_chains.push(pool.token1.chain.chain_id as i32);
484            token1_addresses.push(pool.token1.address.to_string());
485            fees.push(pool.fee.map(|fee| fee as i32));
486            tick_spacings.push(pool.tick_spacing.map(|tick_spacing| tick_spacing as i32));
487        }
488
489        // Execute batch insert with UNNEST
490        sqlx::query(
491            r"
492            INSERT INTO pool (
493                chain_id, address, dex_name, creation_block,
494                token0_chain, token0_address,
495                token1_chain, token1_address,
496                fee, tick_spacing
497            )
498            SELECT *
499            FROM UNNEST(
500                $1::int4[], $2::text[], $3::text[], $4::int8[],
501                $5::int4[], $6::text[], $7::int4[], $8::text[],
502                $9::int4[], $10::int4[]
503            )
504            ON CONFLICT (chain_id, address) DO NOTHING
505           ",
506        )
507        .bind(&chain_ids[..])
508        .bind(&addresses[..])
509        .bind(&dex_names[..])
510        .bind(&creation_blocks[..])
511        .bind(&token0_chains[..])
512        .bind(&token0_addresses[..])
513        .bind(&token1_chains[..])
514        .bind(&token1_addresses[..])
515        .bind(&fees[..])
516        .bind(&tick_spacings[..])
517        .execute(&self.pool)
518        .await
519        .map(|_| ())
520        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool table: {e}"))
521    }
522
523    /// Inserts multiple pool swaps in a single database operation using UNNEST for optimal performance.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if the database operation fails.
528    pub async fn add_pool_swaps_batch(
529        &self,
530        chain_id: u32,
531        swaps: &[PoolSwap],
532    ) -> anyhow::Result<()> {
533        if swaps.is_empty() {
534            return Ok(());
535        }
536
537        // Prepare vectors for each column
538        let mut pool_addresses: Vec<String> = Vec::with_capacity(swaps.len());
539        let mut blocks: Vec<i64> = Vec::with_capacity(swaps.len());
540        let mut transaction_hashes: Vec<String> = Vec::with_capacity(swaps.len());
541        let mut transaction_indices: Vec<i32> = Vec::with_capacity(swaps.len());
542        let mut log_indices: Vec<i32> = Vec::with_capacity(swaps.len());
543        let mut senders: Vec<String> = Vec::with_capacity(swaps.len());
544        let mut sides: Vec<String> = Vec::with_capacity(swaps.len());
545        let mut sizes: Vec<String> = Vec::with_capacity(swaps.len());
546        let mut prices: Vec<String> = Vec::with_capacity(swaps.len());
547        let mut chain_ids: Vec<i32> = Vec::with_capacity(swaps.len());
548
549        // Fill vectors from swaps
550        for swap in swaps {
551            chain_ids.push(chain_id as i32);
552            pool_addresses.push(swap.pool_address.to_string());
553            blocks.push(swap.block as i64);
554            transaction_hashes.push(swap.transaction_hash.clone());
555            transaction_indices.push(swap.transaction_index as i32);
556            log_indices.push(swap.log_index as i32);
557            senders.push(swap.sender.to_string());
558            sides.push(swap.side.to_string());
559            sizes.push(swap.size.to_string());
560            prices.push(swap.price.to_string());
561        }
562
563        // Execute batch insert with UNNEST
564        sqlx::query(
565            r"
566            INSERT INTO pool_swap_event (
567                chain_id, pool_address, block, transaction_hash, transaction_index,
568                log_index, sender, side, size, price
569            )
570            SELECT *
571            FROM UNNEST(
572                $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
573                $6::int4[], $7::text[], $8::text[], $9::text[], $10::text[]
574            )
575            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
576           ",
577        )
578        .bind(&chain_ids[..])
579        .bind(&pool_addresses[..])
580        .bind(&blocks[..])
581        .bind(&transaction_hashes[..])
582        .bind(&transaction_indices[..])
583        .bind(&log_indices[..])
584        .bind(&senders[..])
585        .bind(&sides[..])
586        .bind(&sizes[..])
587        .bind(&prices[..])
588        .execute(&self.pool)
589        .await
590        .map(|_| ())
591        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_swap_event table: {e}"))
592    }
593
594    /// Inserts multiple pool liquidity updates in a single database operation using UNNEST for optimal performance.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the database operation fails.
599    pub async fn add_pool_liquidity_updates_batch(
600        &self,
601        chain_id: u32,
602        updates: &[PoolLiquidityUpdate],
603    ) -> anyhow::Result<()> {
604        if updates.is_empty() {
605            return Ok(());
606        }
607
608        // Prepare vectors for each column
609        let mut pool_addresses: Vec<String> = Vec::with_capacity(updates.len());
610        let mut blocks: Vec<i64> = Vec::with_capacity(updates.len());
611        let mut transaction_hashes: Vec<String> = Vec::with_capacity(updates.len());
612        let mut transaction_indices: Vec<i32> = Vec::with_capacity(updates.len());
613        let mut log_indices: Vec<i32> = Vec::with_capacity(updates.len());
614        let mut event_types: Vec<String> = Vec::with_capacity(updates.len());
615        let mut senders: Vec<Option<String>> = Vec::with_capacity(updates.len());
616        let mut owners: Vec<String> = Vec::with_capacity(updates.len());
617        let mut position_liquidities: Vec<String> = Vec::with_capacity(updates.len());
618        let mut amount0s: Vec<String> = Vec::with_capacity(updates.len());
619        let mut amount1s: Vec<String> = Vec::with_capacity(updates.len());
620        let mut tick_lowers: Vec<i32> = Vec::with_capacity(updates.len());
621        let mut tick_uppers: Vec<i32> = Vec::with_capacity(updates.len());
622        let mut chain_ids: Vec<i32> = Vec::with_capacity(updates.len());
623
624        // Fill vectors from updates
625        for update in updates {
626            chain_ids.push(chain_id as i32);
627            pool_addresses.push(update.pool_address.to_string());
628            blocks.push(update.block as i64);
629            transaction_hashes.push(update.transaction_hash.clone());
630            transaction_indices.push(update.transaction_index as i32);
631            log_indices.push(update.log_index as i32);
632            event_types.push(update.kind.to_string());
633            senders.push(update.sender.map(|s| s.to_string()));
634            owners.push(update.owner.to_string());
635            position_liquidities.push(update.position_liquidity.to_string());
636            amount0s.push(update.amount0.to_string());
637            amount1s.push(update.amount1.to_string());
638            tick_lowers.push(update.tick_lower);
639            tick_uppers.push(update.tick_upper);
640        }
641
642        // Execute batch insert with UNNEST
643        sqlx::query(
644            r"
645            INSERT INTO pool_liquidity_event (
646                chain_id, pool_address, block, transaction_hash, transaction_index,
647                log_index, event_type, sender, owner, position_liquidity,
648                amount0, amount1, tick_lower, tick_upper
649            )
650            SELECT *
651            FROM UNNEST(
652                $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
653                $6::int4[], $7::text[], $8::text[], $9::text[], $10::text[],
654                $11::text[], $12::text[], $13::int4[], $14::int4[]
655            )
656            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
657           ",
658        )
659        .bind(&chain_ids[..])
660        .bind(&pool_addresses[..])
661        .bind(&blocks[..])
662        .bind(&transaction_hashes[..])
663        .bind(&transaction_indices[..])
664        .bind(&log_indices[..])
665        .bind(&event_types[..])
666        .bind(&senders[..])
667        .bind(&owners[..])
668        .bind(&position_liquidities[..])
669        .bind(&amount0s[..])
670        .bind(&amount1s[..])
671        .bind(&tick_lowers[..])
672        .bind(&tick_uppers[..])
673        .execute(&self.pool)
674        .await
675        .map(|_| ())
676        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_liquidity_event table: {e}"))
677    }
678
679    /// Adds or updates a token record in the database.
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if the database operation fails.
684    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
685        sqlx::query(
686            r"
687            INSERT INTO token (
688                chain_id, address, name, symbol, decimals
689            ) VALUES ($1, $2, $3, $4, $5)
690            ON CONFLICT (chain_id, address)
691            DO UPDATE
692            SET
693                name = $3,
694                symbol = $4,
695                decimals = $5
696        ",
697        )
698        .bind(token.chain.chain_id as i32)
699        .bind(token.address.to_string())
700        .bind(token.name.as_str())
701        .bind(token.symbol.as_str())
702        .bind(i32::from(token.decimals))
703        .execute(&self.pool)
704        .await
705        .map(|_| ())
706        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
707    }
708
709    /// Records an invalid token address with associated error information.
710    ///
711    /// # Errors
712    ///
713    /// Returns an error if the database insertion fails.
714    pub async fn add_invalid_token(
715        &self,
716        chain_id: u32,
717        address: &Address,
718        error_string: &str,
719    ) -> anyhow::Result<()> {
720        sqlx::query(
721            r"
722            INSERT INTO token (
723                chain_id, address, error
724            ) VALUES ($1, $2, $3)
725            ON CONFLICT (chain_id, address)
726            DO NOTHING;
727        ",
728        )
729        .bind(chain_id as i32)
730        .bind(address.to_string())
731        .bind(error_string)
732        .execute(&self.pool)
733        .await
734        .map(|_| ())
735        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
736    }
737
738    /// Persists a token swap transaction event to the `pool_swap` table.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the database operation fails.
743    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
744        sqlx::query(
745            r"
746            INSERT INTO pool_swap_event (
747                chain_id, pool_address, block, transaction_hash, transaction_index,
748                log_index, sender, side, size, price
749            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
750            ON CONFLICT (chain_id, transaction_hash, log_index)
751            DO NOTHING
752        ",
753        )
754        .bind(chain_id as i32)
755        .bind(swap.pool_address.to_string())
756        .bind(swap.block as i64)
757        .bind(swap.transaction_hash.as_str())
758        .bind(swap.transaction_index as i32)
759        .bind(swap.log_index as i32)
760        .bind(swap.sender.to_string())
761        .bind(swap.side.to_string())
762        .bind(swap.size.to_string())
763        .bind(swap.price.to_string())
764        .execute(&self.pool)
765        .await
766        .map(|_| ())
767        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
768    }
769
770    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if the database operation fails.
775    pub async fn add_pool_liquidity_update(
776        &self,
777        chain_id: u32,
778        liquidity_update: &PoolLiquidityUpdate,
779    ) -> anyhow::Result<()> {
780        sqlx::query(
781            r"
782            INSERT INTO pool_liquidity_event (
783                chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
784                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
785            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
786            ON CONFLICT (chain_id, transaction_hash, log_index)
787            DO NOTHING
788        ",
789        )
790        .bind(chain_id as i32)
791        .bind(liquidity_update.pool_address.to_string())
792        .bind(liquidity_update.block as i64)
793        .bind(liquidity_update.transaction_hash.as_str())
794        .bind(liquidity_update.transaction_index as i32)
795        .bind(liquidity_update.log_index as i32)
796        .bind(liquidity_update.kind.to_string())
797        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
798        .bind(liquidity_update.owner.to_string())
799        .bind(liquidity_update.position_liquidity.to_string())
800        .bind(liquidity_update.amount0.to_string())
801        .bind(liquidity_update.amount1.to_string())
802        .bind(liquidity_update.tick_lower)
803        .bind(liquidity_update.tick_upper)
804        .execute(&self.pool)
805        .await
806        .map(|_| ())
807        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
808    }
809
810    /// Retrieves all valid token records for the given chain and converts them into `Token` domain objects.
811    ///
812    /// Only returns tokens that do not contain error information, filtering out invalid tokens
813    /// that were previously recorded with error details.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the database query fails.
818    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
819        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1 AND error IS NULL")
820            .bind(chain.chain_id as i32)
821            .fetch_all(&self.pool)
822            .await
823            .map(|rows| {
824                rows.into_iter()
825                    .map(|token_row| {
826                        Token::new(
827                            chain.clone(),
828                            token_row.address,
829                            token_row.name,
830                            token_row.symbol,
831                            token_row.decimals as u8,
832                        )
833                    })
834                    .collect::<Vec<_>>()
835            })
836            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
837    }
838
839    /// Retrieves all invalid token addresses for a given chain.
840    ///
841    /// # Errors
842    ///
843    /// Returns an error if the database query fails or address validation fails.
844    pub async fn load_invalid_token_addresses(
845        &self,
846        chain_id: u32,
847    ) -> anyhow::Result<Vec<Address>> {
848        sqlx::query_as::<_, (String,)>(
849            "SELECT address FROM token WHERE chain_id = $1 AND error IS NOT NULL",
850        )
851        .bind(chain_id as i32)
852        .fetch_all(&self.pool)
853        .await?
854        .into_iter()
855        .map(|(address,)| validate_address(&address))
856        .collect::<Result<Vec<_>, _>>()
857        .map_err(|e| anyhow::anyhow!("Failed to load invalid token addresses: {e}"))
858    }
859
860    /// Loads pool data from the database for the specified chain and DEX.
861    ///
862    /// # Errors
863    ///
864    /// Returns an error if the database query fails, the connection to the database is lost, or the query parameters are invalid.
865    pub async fn load_pools(
866        &self,
867        chain: SharedChain,
868        dex_id: &str,
869    ) -> anyhow::Result<Vec<PoolRow>> {
870        sqlx::query_as::<_, PoolRow>(
871            r"
872            SELECT
873                address,
874                dex_name,
875                creation_block,
876                token0_chain,
877                token0_address,
878                token1_chain,
879                token1_address,
880                fee,
881                tick_spacing
882            FROM pool
883            WHERE chain_id = $1 AND dex_name = $2
884            ORDER BY creation_block ASC
885        ",
886        )
887        .bind(chain.chain_id as i32)
888        .bind(dex_id)
889        .fetch_all(&self.pool)
890        .await
891        .map_err(|e| anyhow::anyhow!("Failed to load pools: {e}"))
892    }
893
894    /// Toggles performance optimization settings for sync operations.
895    ///
896    /// When enabled (true), applies settings for maximum write performance:
897    /// - `synchronous_commit` = OFF
898    /// - `work_mem` increased for bulk operations
899    ///
900    /// When disabled (false), restores default safe settings:
901    /// - `synchronous_commit` = ON (data safety)
902    /// - `work_mem` back to default
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the database operations fail.
907    pub async fn toggle_perf_sync_settings(&self, enable: bool) -> anyhow::Result<()> {
908        if enable {
909            tracing::info!("Enabling performance sync settings for bulk operations");
910
911            // Set synchronous_commit to OFF for maximum write performance
912            sqlx::query("SET synchronous_commit = OFF")
913                .execute(&self.pool)
914                .await
915                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit OFF: {e}"))?;
916
917            // Increase work_mem for bulk operations
918            sqlx::query("SET work_mem = '256MB'")
919                .execute(&self.pool)
920                .await
921                .map_err(|e| anyhow::anyhow!("Failed to set work_mem: {e}"))?;
922
923            tracing::debug!("Performance settings enabled: synchronous_commit=OFF, work_mem=256MB");
924        } else {
925            tracing::info!("Restoring default safe database performance settings");
926
927            // Restore synchronous_commit to ON for data safety
928            sqlx::query("SET synchronous_commit = ON")
929                .execute(&self.pool)
930                .await
931                .map_err(|e| anyhow::anyhow!("Failed to set synchronous_commit ON: {e}"))?;
932
933            // Reset work_mem to default
934            sqlx::query("RESET work_mem")
935                .execute(&self.pool)
936                .await
937                .map_err(|e| anyhow::anyhow!("Failed to reset work_mem: {e}"))?;
938        }
939
940        Ok(())
941    }
942
943    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
944    ///
945    /// # Errors
946    ///
947    /// Returns an error if the database operation fails.
948    pub async fn update_dex_last_synced_block(
949        &self,
950        chain_id: u32,
951        dex: &DexType,
952        block_number: u64,
953    ) -> anyhow::Result<()> {
954        sqlx::query(
955            r"
956            UPDATE dex
957            SET last_full_sync_pools_block_number = $3
958            WHERE chain_id = $1 AND name = $2
959            ",
960        )
961        .bind(chain_id as i32)
962        .bind(dex.to_string())
963        .bind(block_number as i64)
964        .execute(&self.pool)
965        .await
966        .map(|_| ())
967        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
968    }
969
970    pub async fn update_pool_last_synced_block(
971        &self,
972        chain_id: u32,
973        dex: &DexType,
974        pool_address: &Address,
975        block_number: u64,
976    ) -> anyhow::Result<()> {
977        sqlx::query(
978            r"
979            UPDATE pool
980            SET last_full_sync_block_number = $4
981            WHERE chain_id = $1
982            AND dex_name = $2
983            AND address = $3
984            ",
985        )
986        .bind(chain_id as i32)
987        .bind(dex.to_string())
988        .bind(pool_address.to_string())
989        .bind(block_number as i64)
990        .execute(&self.pool)
991        .await
992        .map(|_| ())
993        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
994    }
995
996    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the database query fails.
1001    pub async fn get_dex_last_synced_block(
1002        &self,
1003        chain_id: u32,
1004        dex: &DexType,
1005    ) -> anyhow::Result<Option<u64>> {
1006        let result = sqlx::query_as::<_, (Option<i64>,)>(
1007            r#"
1008            SELECT
1009                last_full_sync_pools_block_number
1010            FROM dex
1011            WHERE chain_id = $1
1012            AND name = $2
1013            "#,
1014        )
1015        .bind(chain_id as i32)
1016        .bind(dex.to_string())
1017        .fetch_optional(&self.pool)
1018        .await
1019        .map_err(|e| anyhow::anyhow!("Failed to get dex last synced block: {e}"))?;
1020
1021        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1022    }
1023
1024    pub async fn get_pool_last_synced_block(
1025        &self,
1026        chain_id: u32,
1027        dex: &DexType,
1028        pool_address: &Address,
1029    ) -> anyhow::Result<Option<u64>> {
1030        let result = sqlx::query_as::<_, (Option<i64>,)>(
1031            r#"
1032            SELECT
1033                last_full_sync_block_number
1034            FROM pool
1035            WHERE chain_id = $1
1036            AND dex_name = $2
1037            AND address = $3
1038            "#,
1039        )
1040        .bind(chain_id as i32)
1041        .bind(dex.to_string())
1042        .bind(pool_address.to_string())
1043        .fetch_optional(&self.pool)
1044        .await
1045        .map_err(|e| anyhow::anyhow!("Failed to get pool last synced block: {e}"))?;
1046
1047        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1048    }
1049
1050    /// Retrieves the maximum block number from a specific table for a given pool.
1051    /// This is useful to detect orphaned data where events were inserted but progress wasn't updated.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if the database query fails.
1056    pub async fn get_table_last_block(
1057        &self,
1058        chain_id: u32,
1059        table_name: &str,
1060        pool_address: &Address,
1061    ) -> anyhow::Result<Option<u64>> {
1062        let query = format!(
1063            "SELECT MAX(block) FROM {} WHERE chain_id = $1 AND pool_address = $2",
1064            table_name
1065        );
1066        let result = sqlx::query_as::<_, (Option<i64>,)>(query.as_str())
1067            .bind(chain_id as i32)
1068            .bind(pool_address.to_string())
1069            .fetch_optional(&self.pool)
1070            .await
1071            .map_err(|e| {
1072                anyhow::anyhow!("Failed to get table last block for {}: {e}", table_name)
1073            })?;
1074
1075        Ok(result.and_then(|(block_number,)| block_number.map(|b| b as u64)))
1076    }
1077
1078    /// Adds a batch of pool fee collect events to the database using batch operations.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the database operation fails.
1083    pub async fn add_pool_collects_batch(
1084        &self,
1085        chain_id: u32,
1086        collects: &[PoolFeeCollect],
1087    ) -> anyhow::Result<()> {
1088        if collects.is_empty() {
1089            return Ok(());
1090        }
1091
1092        // Prepare vectors for each column
1093        let mut pool_addresses: Vec<String> = Vec::with_capacity(collects.len());
1094        let mut blocks: Vec<i64> = Vec::with_capacity(collects.len());
1095        let mut transaction_hashes: Vec<String> = Vec::with_capacity(collects.len());
1096        let mut transaction_indices: Vec<i32> = Vec::with_capacity(collects.len());
1097        let mut log_indices: Vec<i32> = Vec::with_capacity(collects.len());
1098        let mut owners: Vec<String> = Vec::with_capacity(collects.len());
1099        let mut fee0s: Vec<String> = Vec::with_capacity(collects.len());
1100        let mut fee1s: Vec<String> = Vec::with_capacity(collects.len());
1101        let mut tick_lowers: Vec<i32> = Vec::with_capacity(collects.len());
1102        let mut tick_uppers: Vec<i32> = Vec::with_capacity(collects.len());
1103        let mut chain_ids: Vec<i32> = Vec::with_capacity(collects.len());
1104
1105        // Fill vectors from collects
1106        for collect in collects {
1107            chain_ids.push(chain_id as i32);
1108            pool_addresses.push(collect.pool_address.to_string());
1109            blocks.push(collect.block as i64);
1110            transaction_hashes.push(collect.transaction_hash.clone());
1111            transaction_indices.push(collect.transaction_index as i32);
1112            log_indices.push(collect.log_index as i32);
1113            owners.push(collect.owner.to_string());
1114            fee0s.push(collect.fee0.to_string());
1115            fee1s.push(collect.fee1.to_string());
1116            tick_lowers.push(collect.tick_lower);
1117            tick_uppers.push(collect.tick_upper);
1118        }
1119
1120        // Execute batch insert with UNNEST
1121        sqlx::query(
1122            r"
1123            INSERT INTO pool_collect_event (
1124                chain_id, pool_address, block, transaction_hash, transaction_index,
1125                log_index, owner, fee0, fee1, tick_lower, tick_upper
1126            )
1127            SELECT *
1128            FROM UNNEST(
1129                $1::int4[], $2::text[], $3::int8[], $4::text[], $5::int4[],
1130                $6::int4[], $7::text[], $8::text[], $9::text[], $10::int4[], $11::int4[]
1131            )
1132            ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
1133           ",
1134        )
1135        .bind(&chain_ids[..])
1136        .bind(&pool_addresses[..])
1137        .bind(&blocks[..])
1138        .bind(&transaction_hashes[..])
1139        .bind(&transaction_indices[..])
1140        .bind(&log_indices[..])
1141        .bind(&owners[..])
1142        .bind(&fee0s[..])
1143        .bind(&fee1s[..])
1144        .bind(&tick_lowers[..])
1145        .bind(&tick_uppers[..])
1146        .execute(&self.pool)
1147        .await
1148        .map(|_| ())
1149        .map_err(|e| anyhow::anyhow!("Failed to batch insert into pool_fee_collect table: {e}"))
1150    }
1151
1152    pub async fn update_pool_initial_price_tick(
1153        &self,
1154        chain_id: u32,
1155        initialize_event: &InitializeEvent,
1156    ) -> anyhow::Result<()> {
1157        sqlx::query(
1158            r"
1159            UPDATE pool
1160            SET
1161                initial_tick = $4,
1162                initial_sqrt_price_x96 = $5
1163            WHERE chain_id = $1
1164            AND dex_name = $2
1165            AND address = $3
1166            ",
1167        )
1168        .bind(chain_id as i32)
1169        .bind(initialize_event.dex.name.to_string())
1170        .bind(initialize_event.pool_address.to_string())
1171        .bind(initialize_event.tick)
1172        .bind(initialize_event.sqrt_price_x96.to_string())
1173        .execute(&self.pool)
1174        .await
1175        .map(|_| ())
1176        .map_err(|e| anyhow::anyhow!("Failed to update dex last synced block: {e}"))
1177    }
1178}