nautilus_blockchain/cache/
copy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! PostgreSQL COPY BINARY operations for high-performance bulk data loading.
17//!
18//! This module provides utilities for using PostgreSQL's COPY command with binary format,
19//! which offers significantly better performance than standard INSERT operations for bulk data loading.
20
21use nautilus_model::defi::{
22    Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26// Helper to convert scientific notation to decimal
27fn format_scientific_to_decimal(s: &str) -> String {
28    use std::str::FromStr;
29
30    use rust_decimal::Decimal;
31
32    match Decimal::from_str(s) {
33        Ok(decimal) => decimal.to_string(),
34        Err(_) => s.to_string(), // Fallback
35    }
36}
37
38/// Formats a numeric value for PostgreSQL NUMERIC type
39fn format_numeric<T: ToString>(value: &T) -> String {
40    let s = value.to_string();
41
42    // Remove any '+' prefix
43    let s = s.trim_start_matches('+');
44
45    // Handle scientific notation by converting to decimal
46    if s.contains('e') || s.contains('E') {
47        return format_scientific_to_decimal(s);
48    }
49
50    // For very large numbers that rust_decimal can't handle,
51    // just return the cleaned string since alloy_primitives already
52    // produces clean decimal format
53    s.to_string()
54}
55
56/// Handles PostgreSQL COPY BINARY operations for blockchain data.
57#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59    pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63    /// Creates a new COPY handler with a reference to the database pool.
64    #[must_use]
65    pub const fn new(pool: &'a PgPool) -> Self {
66        Self { pool }
67    }
68
69    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
70    ///
71    /// This method is significantly faster than INSERT for bulk operations as it bypasses
72    /// SQL parsing and uses PostgreSQL's native binary protocol.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the COPY operation fails.
77    pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78        if blocks.is_empty() {
79            return Ok(());
80        }
81
82        let copy_statement = r"
83            COPY block (
84                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85                base_fee_per_gas, blob_gas_used, excess_blob_gas,
86                l1_gas_price, l1_gas_used, l1_fee_scalar
87            ) FROM STDIN WITH (FORMAT BINARY)";
88
89        let mut copy_in = self
90            .pool
91            .copy_in_raw(copy_statement)
92            .await
93            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95        // Write binary header
96        self.write_copy_header(&mut copy_in).await?;
97
98        // Write each block as binary data
99        for block in blocks {
100            self.write_block_binary(&mut copy_in, chain_id, block)
101                .await?;
102        }
103
104        // Write binary trailer
105        self.write_copy_trailer(&mut copy_in).await?;
106
107        // Finish the COPY operation
108        copy_in
109            .finish()
110            .await
111            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113        Ok(())
114    }
115
116    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the COPY operation fails.
121    pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122        if tokens.is_empty() {
123            return Ok(());
124        }
125
126        let copy_statement = r"
127            COPY token (
128                chain_id, address, name, symbol, decimals
129            ) FROM STDIN WITH (FORMAT BINARY)";
130
131        let mut copy_in = self
132            .pool
133            .copy_in_raw(copy_statement)
134            .await
135            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137        self.write_copy_header(&mut copy_in).await?;
138        for token in tokens {
139            self.write_token_binary(&mut copy_in, chain_id, token)
140                .await?;
141        }
142        self.write_copy_trailer(&mut copy_in).await?;
143        copy_in
144            .finish()
145            .await
146            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148        Ok(())
149    }
150
151    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the COPY operation fails.
156    pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157        if pools.is_empty() {
158            return Ok(());
159        }
160
161        let copy_statement = r"
162            COPY pool (
163                chain_id, dex_name, address, creation_block,
164                token0_chain, token0_address, token1_chain, token1_address,
165                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
166            ) FROM STDIN WITH (FORMAT BINARY)";
167
168        let mut copy_in = self
169            .pool
170            .copy_in_raw(copy_statement)
171            .await
172            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174        self.write_copy_header(&mut copy_in).await?;
175        for pool in pools {
176            self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177        }
178        self.write_copy_trailer(&mut copy_in).await?;
179        copy_in
180            .finish()
181            .await
182            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184        Ok(())
185    }
186
187    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the COPY operation fails.
192    pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193        if swaps.is_empty() {
194            return Ok(());
195        }
196
197        let copy_statement = r"
198            COPY pool_swap_event (
199                chain_id, pool_address, block, transaction_hash, transaction_index,
200                log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
201            ) FROM STDIN WITH (FORMAT BINARY)";
202
203        let mut copy_in = self
204            .pool
205            .copy_in_raw(copy_statement)
206            .await
207            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
208
209        // Write binary header
210        self.write_copy_header(&mut copy_in).await?;
211
212        // Write each swap as binary data
213        for swap in swaps {
214            self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
215                .await?;
216        }
217
218        // Write binary trailer
219        self.write_copy_trailer(&mut copy_in).await?;
220
221        // Finish the COPY operation
222        copy_in.finish().await.map_err(|e| {
223            // Log detailed information about the failed batch
224            tracing::error!("COPY operation failed for pool_swap batch:");
225            tracing::error!("  Chain ID: {}", chain_id);
226            tracing::error!("  Batch size: {}", swaps.len());
227
228            if !swaps.is_empty() {
229                tracing::error!(
230                    "  Block range: {} to {}",
231                    swaps.iter().map(|s| s.block).min().unwrap_or(0),
232                    swaps.iter().map(|s| s.block).max().unwrap_or(0)
233                );
234            }
235
236            // Log first few swaps with key details
237            for (i, swap) in swaps.iter().take(5).enumerate() {
238                tracing::error!(
239                    "  Swap[{}]: tx={} log_idx={} block={} pool={}",
240                    i,
241                    swap.transaction_hash,
242                    swap.log_index,
243                    swap.block,
244                    swap.pool_address
245                );
246            }
247
248            if swaps.len() > 5 {
249                tracing::error!("  ... and {} more swaps", swaps.len() - 5);
250            }
251
252            anyhow::anyhow!("Failed to finish COPY operation: {e}")
253        })?;
254
255        Ok(())
256    }
257
258    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the COPY operation fails.
263    pub async fn copy_pool_liquidity_updates(
264        &self,
265        chain_id: u32,
266        updates: &[PoolLiquidityUpdate],
267    ) -> anyhow::Result<()> {
268        if updates.is_empty() {
269            return Ok(());
270        }
271
272        let copy_statement = r"
273            COPY pool_liquidity_event (
274                chain_id, pool_address, block, transaction_hash, transaction_index,
275                log_index, event_type, sender, owner, position_liquidity,
276                amount0, amount1, tick_lower, tick_upper
277            ) FROM STDIN WITH (FORMAT BINARY)";
278
279        let mut copy_in = self
280            .pool
281            .copy_in_raw(copy_statement)
282            .await
283            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
284
285        // Write binary header
286        self.write_copy_header(&mut copy_in).await?;
287
288        // Write each liquidity update as binary data
289        for update in updates {
290            self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
291                .await?;
292        }
293
294        // Write binary trailer
295        self.write_copy_trailer(&mut copy_in).await?;
296
297        // Finish the COPY operation
298        copy_in.finish().await.map_err(|e| {
299            // Log detailed information about the failed batch
300            tracing::error!("COPY operation failed for pool_liquidity batch:");
301            tracing::error!("  Chain ID: {}", chain_id);
302            tracing::error!("  Batch size: {}", updates.len());
303
304            if !updates.is_empty() {
305                tracing::error!(
306                    "  Block range: {} to {}",
307                    updates.iter().map(|u| u.block).min().unwrap_or(0),
308                    updates.iter().map(|u| u.block).max().unwrap_or(0)
309                );
310            }
311
312            // Log first few liquidity updates with key details
313            for (i, update) in updates.iter().take(5).enumerate() {
314                tracing::error!(
315                    "  Update[{}]: tx={} log_idx={} block={} pool={} type={}",
316                    i,
317                    update.transaction_hash,
318                    update.log_index,
319                    update.block,
320                    update.pool_address,
321                    update.kind
322                );
323            }
324
325            if updates.len() > 5 {
326                tracing::error!("  ... and {} more updates", updates.len() - 5);
327            }
328
329            anyhow::anyhow!("Failed to finish COPY operation: {e}")
330        })?;
331
332        Ok(())
333    }
334
335    /// Writes the PostgreSQL COPY binary format header.
336    ///
337    /// The header consists of:
338    /// - 11-byte signature: "PGCOPY\n\xff\r\n\0"
339    /// - 4-byte flags field (all zeros)
340    /// - 4-byte header extension length (all zeros)
341    async fn write_copy_header(
342        &self,
343        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
344    ) -> anyhow::Result<()> {
345        use std::io::Write;
346        let mut header = Vec::new();
347
348        // PostgreSQL binary copy header
349        header.write_all(b"PGCOPY\n\xff\r\n\0")?; // Signature
350        header.write_all(&[0, 0, 0, 0])?; // Flags field
351        header.write_all(&[0, 0, 0, 0])?; // Header extension length
352
353        copy_in
354            .send(header)
355            .await
356            .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
357        Ok(())
358    }
359
360    /// Writes a single block in PostgreSQL binary format.
361    ///
362    /// Each row in binary format consists of:
363    /// - 2-byte field count
364    /// - For each field: 4-byte length followed by data (or -1 for NULL)
365    async fn write_block_binary(
366        &self,
367        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
368        chain_id: u32,
369        block: &Block,
370    ) -> anyhow::Result<()> {
371        use std::io::Write;
372        let mut row_data = Vec::new();
373
374        // Number of fields (14)
375        row_data.write_all(&14u16.to_be_bytes())?;
376
377        // Field 1: chain_id (INT4)
378        let chain_id_bytes = (chain_id as i32).to_be_bytes();
379        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
380        row_data.write_all(&chain_id_bytes)?;
381
382        // Field 2: number (INT8)
383        let number_bytes = (block.number as i64).to_be_bytes();
384        row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
385        row_data.write_all(&number_bytes)?;
386
387        // Field 3: hash (TEXT)
388        let hash_bytes = block.hash.as_bytes();
389        row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
390        row_data.write_all(hash_bytes)?;
391
392        // Field 4: parent_hash (TEXT)
393        let parent_hash_bytes = block.parent_hash.as_bytes();
394        row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
395        row_data.write_all(parent_hash_bytes)?;
396
397        // Field 5: miner (TEXT)
398        let miner_bytes = block.miner.to_string().as_bytes().to_vec();
399        row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
400        row_data.write_all(&miner_bytes)?;
401
402        // Field 6: gas_limit (INT8)
403        let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
404        row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
405        row_data.write_all(&gas_limit_bytes)?;
406
407        // Field 7: gas_used (INT8)
408        let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
409        row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
410        row_data.write_all(&gas_used_bytes)?;
411
412        // Field 8: timestamp (TEXT)
413        let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
414        row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
415        row_data.write_all(&timestamp_bytes)?;
416
417        // Field 9: base_fee_per_gas (TEXT, nullable)
418        if let Some(ref base_fee) = block.base_fee_per_gas {
419            let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
420            row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
421            row_data.write_all(&base_fee_bytes)?;
422        } else {
423            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
424        }
425
426        // Field 10: blob_gas_used (TEXT, nullable)
427        if let Some(ref blob_gas) = block.blob_gas_used {
428            let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
429            row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
430            row_data.write_all(&blob_gas_bytes)?;
431        } else {
432            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
433        }
434
435        // Field 11: excess_blob_gas (TEXT, nullable)
436        if let Some(ref excess_blob) = block.excess_blob_gas {
437            let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
438            row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
439            row_data.write_all(&excess_blob_bytes)?;
440        } else {
441            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
442        }
443
444        // Field 12: l1_gas_price (TEXT, nullable)
445        if let Some(ref l1_gas_price) = block.l1_gas_price {
446            let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
447            row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
448            row_data.write_all(&l1_gas_price_bytes)?;
449        } else {
450            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
451        }
452
453        // Field 13: l1_gas_used (INT8, nullable)
454        if let Some(l1_gas_used) = block.l1_gas_used {
455            let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
456            row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
457            row_data.write_all(&l1_gas_used_bytes)?;
458        } else {
459            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
460        }
461
462        // Field 14: l1_fee_scalar (INT8, nullable)
463        if let Some(l1_fee_scalar) = block.l1_fee_scalar {
464            let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
465            row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
466            row_data.write_all(&l1_fee_scalar_bytes)?;
467        } else {
468            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
469        }
470
471        copy_in
472            .send(row_data)
473            .await
474            .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
475        Ok(())
476    }
477
478    /// Writes a single pool swap in PostgreSQL binary format.
479    ///
480    /// Each row in binary format consists of:
481    /// - 2-byte field count
482    /// - For each field: 4-byte length followed by data (or -1 for NULL)
483    async fn write_pool_swap_binary(
484        &self,
485        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
486        chain_id: u32,
487        swap: &PoolSwap,
488    ) -> anyhow::Result<()> {
489        use std::io::Write;
490        let mut row_data = Vec::new();
491
492        // Number of fields (14)
493        row_data.write_all(&14u16.to_be_bytes())?;
494
495        // chain_id (INT4)
496        let chain_id_bytes = (chain_id as i32).to_be_bytes();
497        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
498        row_data.write_all(&chain_id_bytes)?;
499
500        // pool_address (TEXT)
501        let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
502        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
503        row_data.write_all(&pool_address_bytes)?;
504
505        // block (INT8)
506        let block_bytes = (swap.block as i64).to_be_bytes();
507        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
508        row_data.write_all(&block_bytes)?;
509
510        // transaction_hash (TEXT)
511        let tx_hash_bytes = swap.transaction_hash.as_bytes();
512        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
513        row_data.write_all(tx_hash_bytes)?;
514
515        // transaction_index (INT4)
516        let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
517        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
518        row_data.write_all(&tx_index_bytes)?;
519
520        // log_index (INT4)
521        let log_index_bytes = (swap.log_index as i32).to_be_bytes();
522        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
523        row_data.write_all(&log_index_bytes)?;
524
525        // sender (TEXT)
526        let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
527        row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
528        row_data.write_all(&sender_bytes)?;
529
530        // recipient (TEXT)
531        let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
532        row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
533        row_data.write_all(&recipient_bytes)?;
534
535        // side (TEXT or NULL)
536        if let Some(side) = swap.side {
537            let side_bytes = side.to_string().as_bytes().to_vec();
538            row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
539            row_data.write_all(&side_bytes)?;
540        } else {
541            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
542        }
543
544        // size (TEXT or NULL)
545        if let Some(size) = swap.size {
546            let size_bytes = size.to_string().as_bytes().to_vec();
547            row_data.write_all(&(size_bytes.len() as i32).to_be_bytes())?;
548            row_data.write_all(&size_bytes)?;
549        } else {
550            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
551        }
552
553        // price (TEXT or NULL)
554        if let Some(price) = swap.price {
555            let price_bytes = price.to_string().as_bytes().to_vec();
556            row_data.write_all(&(price_bytes.len() as i32).to_be_bytes())?;
557            row_data.write_all(&price_bytes)?;
558        } else {
559            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
560        }
561
562        // sqrt_price_x96 (U160)
563        let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
564        row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
565        row_data.write_all(&sqrt_price_bytes)?;
566
567        // amount0 (I256)
568        let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
569        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
570        row_data.write_all(&amount0_bytes)?;
571
572        // amount1 (I256)
573        let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
574        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
575        row_data.write_all(&amount1_bytes)?;
576
577        copy_in
578            .send(row_data)
579            .await
580            .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
581        Ok(())
582    }
583
584    /// Writes a single pool liquidity update in PostgreSQL binary format.
585    ///
586    /// Each row in binary format consists of:
587    /// - 2-byte field count
588    /// - For each field: 4-byte length followed by data (or -1 for NULL)
589    async fn write_pool_liquidity_update_binary(
590        &self,
591        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
592        chain_id: u32,
593        update: &PoolLiquidityUpdate,
594    ) -> anyhow::Result<()> {
595        use std::io::Write;
596        let mut row_data = Vec::new();
597
598        // Number of fields (14)
599        row_data.write_all(&14u16.to_be_bytes())?;
600
601        // Field 1: chain_id (INT4)
602        let chain_id_bytes = (chain_id as i32).to_be_bytes();
603        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
604        row_data.write_all(&chain_id_bytes)?;
605
606        // Field 2: pool_address (TEXT)
607        let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
608        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
609        row_data.write_all(&pool_address_bytes)?;
610
611        // Field 3: block (INT8)
612        let block_bytes = (update.block as i64).to_be_bytes();
613        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
614        row_data.write_all(&block_bytes)?;
615
616        // Field 4: transaction_hash (TEXT)
617        let tx_hash_bytes = update.transaction_hash.as_bytes();
618        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
619        row_data.write_all(tx_hash_bytes)?;
620
621        // Field 5: transaction_index (INT4)
622        let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
623        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
624        row_data.write_all(&tx_index_bytes)?;
625
626        // Field 6: log_index (INT4)
627        let log_index_bytes = (update.log_index as i32).to_be_bytes();
628        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
629        row_data.write_all(&log_index_bytes)?;
630
631        // Field 7: event_type (TEXT)
632        let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
633        row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
634        row_data.write_all(&event_type_bytes)?;
635
636        // Field 8: sender (TEXT, nullable)
637        if let Some(sender) = update.sender {
638            let sender_bytes = sender.to_string().as_bytes().to_vec();
639            row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
640            row_data.write_all(&sender_bytes)?;
641        } else {
642            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
643        }
644
645        // Field 9: owner (TEXT)
646        let owner_bytes = update.owner.to_string().as_bytes().to_vec();
647        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
648        row_data.write_all(&owner_bytes)?;
649
650        // Field 10: position_liquidity (U128)
651        let position_liquidity_bytes = format_numeric(&update.position_liquidity)
652            .as_bytes()
653            .to_vec();
654        row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
655        row_data.write_all(&position_liquidity_bytes)?;
656
657        // Field 11: amount0 (U256)
658        let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
659        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
660        row_data.write_all(&amount0_bytes)?;
661
662        // Field 12: amount1 (U256)
663        let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
664        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
665        row_data.write_all(&amount1_bytes)?;
666
667        // Field 13: tick_lower (INT4)
668        let tick_lower_bytes = update.tick_lower.to_be_bytes();
669        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
670        row_data.write_all(&tick_lower_bytes)?;
671
672        // Field 14: tick_upper (INT4)
673        let tick_upper_bytes = update.tick_upper.to_be_bytes();
674        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
675        row_data.write_all(&tick_upper_bytes)?;
676
677        copy_in
678            .send(row_data)
679            .await
680            .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
681        Ok(())
682    }
683
684    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if the COPY operation fails.
689    pub async fn copy_pool_collects(
690        &self,
691        chain_id: u32,
692        collects: &[PoolFeeCollect],
693    ) -> anyhow::Result<()> {
694        if collects.is_empty() {
695            return Ok(());
696        }
697
698        let copy_statement = r"
699            COPY pool_collect_event (
700                chain_id, pool_address, block, transaction_hash, transaction_index,
701                log_index, owner, amount0, amount1, tick_lower, tick_upper
702            ) FROM STDIN WITH (FORMAT BINARY)";
703
704        let mut copy_in = self
705            .pool
706            .copy_in_raw(copy_statement)
707            .await
708            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
709
710        // Write binary header
711        self.write_copy_header(&mut copy_in).await?;
712
713        // Write each collect event as binary data
714        for collect in collects {
715            self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
716                .await?;
717        }
718
719        // Write binary trailer
720        self.write_copy_trailer(&mut copy_in).await?;
721
722        // Finish the COPY operation
723        copy_in.finish().await.map_err(|e| {
724            // Log detailed information about the failed batch
725            tracing::error!("COPY operation failed for temp_pool_collect batch:");
726            tracing::error!("  Chain ID: {}", chain_id);
727            tracing::error!("  Batch size: {}", collects.len());
728
729            if !collects.is_empty() {
730                tracing::error!(
731                    "  Block range: {} to {}",
732                    collects.iter().map(|c| c.block).min().unwrap_or(0),
733                    collects.iter().map(|c| c.block).max().unwrap_or(0)
734                );
735            }
736
737            // Log first few collects with key details
738            for (i, collect) in collects.iter().take(5).enumerate() {
739                tracing::error!(
740                    "  Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
741                    i,
742                    collect.transaction_hash,
743                    collect.log_index,
744                    collect.block,
745                    collect.pool_address,
746                    collect.owner
747                );
748            }
749
750            if collects.len() > 5 {
751                tracing::error!("  ... and {} more collects", collects.len() - 5);
752            }
753
754            anyhow::anyhow!("Failed to finish COPY operation: {e}")
755        })?;
756
757        Ok(())
758    }
759
760    /// Writes a single pool fee collect in PostgreSQL binary format.
761    ///
762    /// Each row in binary format consists of:
763    /// - 2-byte field count
764    /// - For each field: 4-byte length followed by data (or -1 for NULL)
765    async fn write_pool_fee_collect_binary(
766        &self,
767        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
768        chain_id: u32,
769        collect: &PoolFeeCollect,
770    ) -> anyhow::Result<()> {
771        use std::io::Write;
772        let mut row_data = Vec::new();
773
774        // Number of fields (11)
775        row_data.write_all(&11u16.to_be_bytes())?;
776
777        // Field 1: chain_id (INT4)
778        let chain_id_bytes = (chain_id as i32).to_be_bytes();
779        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
780        row_data.write_all(&chain_id_bytes)?;
781
782        // Field 2: pool_address (TEXT)
783        let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
784        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
785        row_data.write_all(&pool_address_bytes)?;
786
787        // Field 3: block (INT8)
788        let block_bytes = (collect.block as i64).to_be_bytes();
789        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
790        row_data.write_all(&block_bytes)?;
791
792        // Field 4: transaction_hash (TEXT)
793        let tx_hash_bytes = collect.transaction_hash.as_bytes();
794        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
795        row_data.write_all(tx_hash_bytes)?;
796
797        // Field 5: transaction_index (INT4)
798        let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
799        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
800        row_data.write_all(&tx_index_bytes)?;
801
802        // Field 6: log_index (INT4)
803        let log_index_bytes = (collect.log_index as i32).to_be_bytes();
804        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
805        row_data.write_all(&log_index_bytes)?;
806
807        // Field 7: owner (TEXT)
808        let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
809        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
810        row_data.write_all(&owner_bytes)?;
811
812        // Field 8: amount0 (U256)
813        let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
814        row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
815        row_data.write_all(&fee0_bytes)?;
816
817        // Field 9: amount1 (U256)
818        let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
819        row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
820        row_data.write_all(&fee1_bytes)?;
821
822        // Field 10: tick_lower (INT4)
823        let tick_lower_bytes = collect.tick_lower.to_be_bytes();
824        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
825        row_data.write_all(&tick_lower_bytes)?;
826
827        // Field 11: tick_upper (INT4)
828        let tick_upper_bytes = collect.tick_upper.to_be_bytes();
829        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
830        row_data.write_all(&tick_upper_bytes)?;
831
832        copy_in
833            .send(row_data)
834            .await
835            .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
836        Ok(())
837    }
838
839    /// Writes a single token in PostgreSQL binary format.
840    ///
841    /// Each row in binary format consists of:
842    /// - 2-byte field count
843    /// - For each field: 4-byte length followed by data (or -1 for NULL)
844    async fn write_token_binary(
845        &self,
846        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
847        chain_id: u32,
848        token: &Token,
849    ) -> anyhow::Result<()> {
850        use std::io::Write;
851        let mut row_data = Vec::new();
852
853        // Number of fields (5)
854        row_data.write_all(&5u16.to_be_bytes())?;
855
856        // Field 1: chain_id (INT4)
857        let chain_id_bytes = (chain_id as i32).to_be_bytes();
858        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
859        row_data.write_all(&chain_id_bytes)?;
860
861        // Field 2: address (TEXT)
862        let address_bytes = token.address.to_string().as_bytes().to_vec();
863        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
864        row_data.write_all(&address_bytes)?;
865
866        // Field 3: name (TEXT)
867        let name_bytes = token.name.as_bytes();
868        row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
869        row_data.write_all(name_bytes)?;
870
871        // Field 4: symbol (TEXT)
872        let symbol_bytes = token.symbol.as_bytes();
873        row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
874        row_data.write_all(symbol_bytes)?;
875
876        // Field 5: decimals (INT4)
877        let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
878        row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
879        row_data.write_all(&decimals_bytes)?;
880
881        copy_in
882            .send(row_data)
883            .await
884            .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
885        Ok(())
886    }
887
888    /// Writes a single pool in PostgreSQL binary format.
889    ///
890    /// Each row in binary format consists of:
891    /// - 2-byte field count
892    /// - For each field: 4-byte length followed by data (or -1 for NULL)
893    async fn write_pool_binary(
894        &self,
895        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
896        chain_id: u32,
897        pool: &Pool,
898    ) -> anyhow::Result<()> {
899        use std::io::Write;
900        let mut row_data = Vec::new();
901
902        // Number of fields (12)
903        row_data.write_all(&12u16.to_be_bytes())?;
904
905        // Field 1: chain_id (INT4)
906        let chain_id_bytes = (chain_id as i32).to_be_bytes();
907        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
908        row_data.write_all(&chain_id_bytes)?;
909
910        // Field 2: dex_name (TEXT)
911        let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
912        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
913        row_data.write_all(&dex_name_bytes)?;
914
915        // Field 3: address (TEXT)
916        let address_bytes = pool.address.to_string().as_bytes().to_vec();
917        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
918        row_data.write_all(&address_bytes)?;
919
920        // Field 4: creation_block (INT8)
921        let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
922        row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
923        row_data.write_all(&creation_block_bytes)?;
924
925        // Field 5: token0_chain (INT4)
926        let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
927        row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
928        row_data.write_all(&token0_chain_bytes)?;
929
930        // Field 6: token0_address (TEXT)
931        let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
932        row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
933        row_data.write_all(&token0_address_bytes)?;
934
935        // Field 7: token1_chain (INT4)
936        let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
937        row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
938        row_data.write_all(&token1_chain_bytes)?;
939
940        // Field 8: token1_address (TEXT)
941        let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
942        row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
943        row_data.write_all(&token1_address_bytes)?;
944
945        // Field 9: fee (INT4, nullable)
946        if let Some(fee) = pool.fee {
947            let fee_bytes = (fee as i32).to_be_bytes();
948            row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
949            row_data.write_all(&fee_bytes)?;
950        } else {
951            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
952        }
953
954        // Field 10: tick_spacing (INT4, nullable)
955        if let Some(tick_spacing) = pool.tick_spacing {
956            let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
957            row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
958            row_data.write_all(&tick_spacing_bytes)?;
959        } else {
960            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
961        }
962
963        // Field 11: initial_tick (INT4, nullable)
964        if let Some(initial_tick) = pool.initial_tick {
965            let initial_tick_bytes = initial_tick.to_be_bytes();
966            row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
967            row_data.write_all(&initial_tick_bytes)?;
968        } else {
969            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
970        }
971
972        // Field 12: initial_sqrt_price_x96 (TEXT, nullable)
973        if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
974            let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
975            row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
976            row_data.write_all(&sqrt_price_bytes)?;
977        } else {
978            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
979        }
980
981        copy_in
982            .send(row_data)
983            .await
984            .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
985        Ok(())
986    }
987
988    /// Writes the PostgreSQL COPY binary format trailer.
989    ///
990    /// The trailer is a 2-byte value of -1 to indicate end of data.
991    async fn write_copy_trailer(
992        &self,
993        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
994    ) -> anyhow::Result<()> {
995        // Binary trailer: -1 as i16 to indicate end of data
996        let trailer = (-1i16).to_be_bytes();
997        copy_in
998            .send(trailer.to_vec())
999            .await
1000            .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
1001        Ok(())
1002    }
1003}