nautilus_blockchain/cache/
copy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! PostgreSQL COPY BINARY operations for high-performance bulk data loading.
17//!
18//! This module provides utilities for using PostgreSQL's COPY command with binary format,
19//! which offers significantly better performance than standard INSERT operations for bulk data loading.
20
21use nautilus_model::defi::{Block, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect};
22use sqlx::{PgPool, postgres::PgPoolCopyExt};
23
24// Helper to convert scientific notation to decimal
25fn format_scientific_to_decimal(s: &str) -> String {
26    use std::str::FromStr;
27
28    use rust_decimal::Decimal;
29
30    match Decimal::from_str(s) {
31        Ok(decimal) => decimal.to_string(),
32        Err(_) => s.to_string(), // Fallback
33    }
34}
35
36/// Formats a numeric value for PostgreSQL NUMERIC type
37fn format_numeric<T: ToString>(value: &T) -> String {
38    let s = value.to_string();
39
40    // Remove any '+' prefix
41    let s = s.trim_start_matches('+');
42
43    // Handle scientific notation by converting to decimal
44    if s.contains('e') || s.contains('E') {
45        return format_scientific_to_decimal(s);
46    }
47
48    // For very large numbers that rust_decimal can't handle,
49    // just return the cleaned string since alloy_primitives already
50    // produces clean decimal format
51    s.to_string()
52}
53
54/// Handles PostgreSQL COPY BINARY operations for blockchain data.
55#[derive(Debug)]
56pub struct PostgresCopyHandler<'a> {
57    pool: &'a PgPool,
58}
59
60impl<'a> PostgresCopyHandler<'a> {
61    /// Creates a new COPY handler with a reference to the database pool.
62    #[must_use]
63    pub const fn new(pool: &'a PgPool) -> Self {
64        Self { pool }
65    }
66
67    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
68    ///
69    /// This method is significantly faster than INSERT for bulk operations as it bypasses
70    /// SQL parsing and uses PostgreSQL's native binary protocol.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the COPY operation fails.
75    pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
76        if blocks.is_empty() {
77            return Ok(());
78        }
79
80        let copy_statement = r"
81            COPY block (
82                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
83                base_fee_per_gas, blob_gas_used, excess_blob_gas,
84                l1_gas_price, l1_gas_used, l1_fee_scalar
85            ) FROM STDIN WITH (FORMAT BINARY)";
86
87        let mut copy_in = self
88            .pool
89            .copy_in_raw(copy_statement)
90            .await
91            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
92
93        // Write binary header
94        self.write_copy_header(&mut copy_in).await?;
95
96        // Write each block as binary data
97        for block in blocks {
98            self.write_block_binary(&mut copy_in, chain_id, block)
99                .await?;
100        }
101
102        // Write binary trailer
103        self.write_copy_trailer(&mut copy_in).await?;
104
105        // Finish the COPY operation
106        copy_in
107            .finish()
108            .await
109            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
110
111        Ok(())
112    }
113
114    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the COPY operation fails.
119    pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
120        if swaps.is_empty() {
121            return Ok(());
122        }
123
124        let copy_statement = r"
125            COPY pool_swap_event (
126                chain_id, pool_address, block, transaction_hash, transaction_index,
127                log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
128            ) FROM STDIN WITH (FORMAT BINARY)";
129
130        let mut copy_in = self
131            .pool
132            .copy_in_raw(copy_statement)
133            .await
134            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
135
136        // Write binary header
137        self.write_copy_header(&mut copy_in).await?;
138
139        // Write each swap as binary data
140        for swap in swaps {
141            self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
142                .await?;
143        }
144
145        // Write binary trailer
146        self.write_copy_trailer(&mut copy_in).await?;
147
148        // Finish the COPY operation
149        copy_in.finish().await.map_err(|e| {
150            // Log detailed information about the failed batch
151            tracing::error!("COPY operation failed for pool_swap batch:");
152            tracing::error!("  Chain ID: {}", chain_id);
153            tracing::error!("  Batch size: {}", swaps.len());
154
155            if !swaps.is_empty() {
156                tracing::error!(
157                    "  Block range: {} to {}",
158                    swaps.iter().map(|s| s.block).min().unwrap_or(0),
159                    swaps.iter().map(|s| s.block).max().unwrap_or(0)
160                );
161            }
162
163            // Log first few swaps with key details
164            for (i, swap) in swaps.iter().take(5).enumerate() {
165                tracing::error!(
166                    "  Swap[{}]: tx={} log_idx={} block={} pool={}",
167                    i,
168                    swap.transaction_hash,
169                    swap.log_index,
170                    swap.block,
171                    swap.pool_address
172                );
173            }
174
175            if swaps.len() > 5 {
176                tracing::error!("  ... and {} more swaps", swaps.len() - 5);
177            }
178
179            anyhow::anyhow!("Failed to finish COPY operation: {e}")
180        })?;
181
182        Ok(())
183    }
184
185    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the COPY operation fails.
190    pub async fn copy_pool_liquidity_updates(
191        &self,
192        chain_id: u32,
193        updates: &[PoolLiquidityUpdate],
194    ) -> anyhow::Result<()> {
195        if updates.is_empty() {
196            return Ok(());
197        }
198
199        let copy_statement = r"
200            COPY pool_liquidity_event (
201                chain_id, pool_address, block, transaction_hash, transaction_index,
202                log_index, event_type, sender, owner, position_liquidity,
203                amount0, amount1, tick_lower, tick_upper
204            ) FROM STDIN WITH (FORMAT BINARY)";
205
206        let mut copy_in = self
207            .pool
208            .copy_in_raw(copy_statement)
209            .await
210            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
211
212        // Write binary header
213        self.write_copy_header(&mut copy_in).await?;
214
215        // Write each liquidity update as binary data
216        for update in updates {
217            self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
218                .await?;
219        }
220
221        // Write binary trailer
222        self.write_copy_trailer(&mut copy_in).await?;
223
224        // Finish the COPY operation
225        copy_in.finish().await.map_err(|e| {
226            // Log detailed information about the failed batch
227            tracing::error!("COPY operation failed for pool_liquidity batch:");
228            tracing::error!("  Chain ID: {}", chain_id);
229            tracing::error!("  Batch size: {}", updates.len());
230
231            if !updates.is_empty() {
232                tracing::error!(
233                    "  Block range: {} to {}",
234                    updates.iter().map(|u| u.block).min().unwrap_or(0),
235                    updates.iter().map(|u| u.block).max().unwrap_or(0)
236                );
237            }
238
239            // Log first few liquidity updates with key details
240            for (i, update) in updates.iter().take(5).enumerate() {
241                tracing::error!(
242                    "  Update[{}]: tx={} log_idx={} block={} pool={} type={}",
243                    i,
244                    update.transaction_hash,
245                    update.log_index,
246                    update.block,
247                    update.pool_address,
248                    update.kind
249                );
250            }
251
252            if updates.len() > 5 {
253                tracing::error!("  ... and {} more updates", updates.len() - 5);
254            }
255
256            anyhow::anyhow!("Failed to finish COPY operation: {e}")
257        })?;
258
259        Ok(())
260    }
261
262    /// Writes the PostgreSQL COPY binary format header.
263    ///
264    /// The header consists of:
265    /// - 11-byte signature: "PGCOPY\n\xff\r\n\0"
266    /// - 4-byte flags field (all zeros)
267    /// - 4-byte header extension length (all zeros)
268    async fn write_copy_header(
269        &self,
270        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
271    ) -> anyhow::Result<()> {
272        use std::io::Write;
273        let mut header = Vec::new();
274
275        // PostgreSQL binary copy header
276        header.write_all(b"PGCOPY\n\xff\r\n\0")?; // Signature
277        header.write_all(&[0, 0, 0, 0])?; // Flags field
278        header.write_all(&[0, 0, 0, 0])?; // Header extension length
279
280        copy_in
281            .send(header)
282            .await
283            .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
284        Ok(())
285    }
286
287    /// Writes a single block in PostgreSQL binary format.
288    ///
289    /// Each row in binary format consists of:
290    /// - 2-byte field count
291    /// - For each field: 4-byte length followed by data (or -1 for NULL)
292    async fn write_block_binary(
293        &self,
294        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
295        chain_id: u32,
296        block: &Block,
297    ) -> anyhow::Result<()> {
298        use std::io::Write;
299        let mut row_data = Vec::new();
300
301        // Number of fields (14)
302        row_data.write_all(&14u16.to_be_bytes())?;
303
304        // Field 1: chain_id (INT4)
305        let chain_id_bytes = (chain_id as i32).to_be_bytes();
306        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
307        row_data.write_all(&chain_id_bytes)?;
308
309        // Field 2: number (INT8)
310        let number_bytes = (block.number as i64).to_be_bytes();
311        row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
312        row_data.write_all(&number_bytes)?;
313
314        // Field 3: hash (TEXT)
315        let hash_bytes = block.hash.as_bytes();
316        row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
317        row_data.write_all(hash_bytes)?;
318
319        // Field 4: parent_hash (TEXT)
320        let parent_hash_bytes = block.parent_hash.as_bytes();
321        row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
322        row_data.write_all(parent_hash_bytes)?;
323
324        // Field 5: miner (TEXT)
325        let miner_bytes = block.miner.to_string().as_bytes().to_vec();
326        row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
327        row_data.write_all(&miner_bytes)?;
328
329        // Field 6: gas_limit (INT8)
330        let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
331        row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
332        row_data.write_all(&gas_limit_bytes)?;
333
334        // Field 7: gas_used (INT8)
335        let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
336        row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
337        row_data.write_all(&gas_used_bytes)?;
338
339        // Field 8: timestamp (TEXT)
340        let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
341        row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
342        row_data.write_all(&timestamp_bytes)?;
343
344        // Field 9: base_fee_per_gas (TEXT, nullable)
345        if let Some(ref base_fee) = block.base_fee_per_gas {
346            let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
347            row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
348            row_data.write_all(&base_fee_bytes)?;
349        } else {
350            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
351        }
352
353        // Field 10: blob_gas_used (TEXT, nullable)
354        if let Some(ref blob_gas) = block.blob_gas_used {
355            let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
356            row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
357            row_data.write_all(&blob_gas_bytes)?;
358        } else {
359            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
360        }
361
362        // Field 11: excess_blob_gas (TEXT, nullable)
363        if let Some(ref excess_blob) = block.excess_blob_gas {
364            let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
365            row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
366            row_data.write_all(&excess_blob_bytes)?;
367        } else {
368            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
369        }
370
371        // Field 12: l1_gas_price (TEXT, nullable)
372        if let Some(ref l1_gas_price) = block.l1_gas_price {
373            let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
374            row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
375            row_data.write_all(&l1_gas_price_bytes)?;
376        } else {
377            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
378        }
379
380        // Field 13: l1_gas_used (INT8, nullable)
381        if let Some(l1_gas_used) = block.l1_gas_used {
382            let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
383            row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
384            row_data.write_all(&l1_gas_used_bytes)?;
385        } else {
386            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
387        }
388
389        // Field 14: l1_fee_scalar (INT8, nullable)
390        if let Some(l1_fee_scalar) = block.l1_fee_scalar {
391            let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
392            row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
393            row_data.write_all(&l1_fee_scalar_bytes)?;
394        } else {
395            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
396        }
397
398        copy_in
399            .send(row_data)
400            .await
401            .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
402        Ok(())
403    }
404
405    /// Writes a single pool swap in PostgreSQL binary format.
406    ///
407    /// Each row in binary format consists of:
408    /// - 2-byte field count
409    /// - For each field: 4-byte length followed by data (or -1 for NULL)
410    async fn write_pool_swap_binary(
411        &self,
412        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
413        chain_id: u32,
414        swap: &PoolSwap,
415    ) -> anyhow::Result<()> {
416        use std::io::Write;
417        let mut row_data = Vec::new();
418
419        // Number of fields (14)
420        row_data.write_all(&14u16.to_be_bytes())?;
421
422        // chain_id (INT4)
423        let chain_id_bytes = (chain_id as i32).to_be_bytes();
424        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
425        row_data.write_all(&chain_id_bytes)?;
426
427        // pool_address (TEXT)
428        let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
429        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
430        row_data.write_all(&pool_address_bytes)?;
431
432        // block (INT8)
433        let block_bytes = (swap.block as i64).to_be_bytes();
434        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
435        row_data.write_all(&block_bytes)?;
436
437        // transaction_hash (TEXT)
438        let tx_hash_bytes = swap.transaction_hash.as_bytes();
439        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
440        row_data.write_all(tx_hash_bytes)?;
441
442        // transaction_index (INT4)
443        let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
444        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
445        row_data.write_all(&tx_index_bytes)?;
446
447        // log_index (INT4)
448        let log_index_bytes = (swap.log_index as i32).to_be_bytes();
449        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
450        row_data.write_all(&log_index_bytes)?;
451
452        // sender (TEXT)
453        let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
454        row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
455        row_data.write_all(&sender_bytes)?;
456
457        // recipient (TEXT)
458        let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
459        row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
460        row_data.write_all(&recipient_bytes)?;
461
462        // side (TEXT or NULL)
463        if let Some(side) = swap.side {
464            let side_bytes = side.to_string().as_bytes().to_vec();
465            row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
466            row_data.write_all(&side_bytes)?;
467        } else {
468            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
469        }
470
471        // size (TEXT or NULL)
472        if let Some(size) = swap.size {
473            let size_bytes = size.to_string().as_bytes().to_vec();
474            row_data.write_all(&(size_bytes.len() as i32).to_be_bytes())?;
475            row_data.write_all(&size_bytes)?;
476        } else {
477            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
478        }
479
480        // price (TEXT or NULL)
481        if let Some(price) = swap.price {
482            let price_bytes = price.to_string().as_bytes().to_vec();
483            row_data.write_all(&(price_bytes.len() as i32).to_be_bytes())?;
484            row_data.write_all(&price_bytes)?;
485        } else {
486            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
487        }
488
489        // sqrt_price_x96 (U160)
490        let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
491        row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
492        row_data.write_all(&sqrt_price_bytes)?;
493
494        // amount0 (I256)
495        let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
496        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
497        row_data.write_all(&amount0_bytes)?;
498
499        // amount1 (I256)
500        let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
501        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
502        row_data.write_all(&amount1_bytes)?;
503
504        copy_in
505            .send(row_data)
506            .await
507            .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
508        Ok(())
509    }
510
511    /// Writes a single pool liquidity update in PostgreSQL binary format.
512    ///
513    /// Each row in binary format consists of:
514    /// - 2-byte field count
515    /// - For each field: 4-byte length followed by data (or -1 for NULL)
516    async fn write_pool_liquidity_update_binary(
517        &self,
518        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
519        chain_id: u32,
520        update: &PoolLiquidityUpdate,
521    ) -> anyhow::Result<()> {
522        use std::io::Write;
523        let mut row_data = Vec::new();
524
525        // Number of fields (14)
526        row_data.write_all(&14u16.to_be_bytes())?;
527
528        // Field 1: chain_id (INT4)
529        let chain_id_bytes = (chain_id as i32).to_be_bytes();
530        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
531        row_data.write_all(&chain_id_bytes)?;
532
533        // Field 2: pool_address (TEXT)
534        let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
535        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
536        row_data.write_all(&pool_address_bytes)?;
537
538        // Field 3: block (INT8)
539        let block_bytes = (update.block as i64).to_be_bytes();
540        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
541        row_data.write_all(&block_bytes)?;
542
543        // Field 4: transaction_hash (TEXT)
544        let tx_hash_bytes = update.transaction_hash.as_bytes();
545        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
546        row_data.write_all(tx_hash_bytes)?;
547
548        // Field 5: transaction_index (INT4)
549        let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
550        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
551        row_data.write_all(&tx_index_bytes)?;
552
553        // Field 6: log_index (INT4)
554        let log_index_bytes = (update.log_index as i32).to_be_bytes();
555        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
556        row_data.write_all(&log_index_bytes)?;
557
558        // Field 7: event_type (TEXT)
559        let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
560        row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
561        row_data.write_all(&event_type_bytes)?;
562
563        // Field 8: sender (TEXT, nullable)
564        if let Some(sender) = update.sender {
565            let sender_bytes = sender.to_string().as_bytes().to_vec();
566            row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
567            row_data.write_all(&sender_bytes)?;
568        } else {
569            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
570        }
571
572        // Field 9: owner (TEXT)
573        let owner_bytes = update.owner.to_string().as_bytes().to_vec();
574        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
575        row_data.write_all(&owner_bytes)?;
576
577        // Field 10: position_liquidity (U128)
578        let position_liquidity_bytes = format_numeric(&update.position_liquidity)
579            .as_bytes()
580            .to_vec();
581        row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
582        row_data.write_all(&position_liquidity_bytes)?;
583
584        // Field 11: amount0 (U256)
585        let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
586        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
587        row_data.write_all(&amount0_bytes)?;
588
589        // Field 12: amount1 (U256)
590        let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
591        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
592        row_data.write_all(&amount1_bytes)?;
593
594        // Field 13: tick_lower (INT4)
595        let tick_lower_bytes = update.tick_lower.to_be_bytes();
596        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
597        row_data.write_all(&tick_lower_bytes)?;
598
599        // Field 14: tick_upper (INT4)
600        let tick_upper_bytes = update.tick_upper.to_be_bytes();
601        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
602        row_data.write_all(&tick_upper_bytes)?;
603
604        copy_in
605            .send(row_data)
606            .await
607            .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
608        Ok(())
609    }
610
611    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if the COPY operation fails.
616    pub async fn copy_pool_collects(
617        &self,
618        chain_id: u32,
619        collects: &[PoolFeeCollect],
620    ) -> anyhow::Result<()> {
621        if collects.is_empty() {
622            return Ok(());
623        }
624
625        let copy_statement = r"
626            COPY pool_collect_event (
627                chain_id, pool_address, block, transaction_hash, transaction_index,
628                log_index, owner, amount0, amount1, tick_lower, tick_upper
629            ) FROM STDIN WITH (FORMAT BINARY)";
630
631        let mut copy_in = self
632            .pool
633            .copy_in_raw(copy_statement)
634            .await
635            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
636
637        // Write binary header
638        self.write_copy_header(&mut copy_in).await?;
639
640        // Write each collect event as binary data
641        for collect in collects {
642            self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
643                .await?;
644        }
645
646        // Write binary trailer
647        self.write_copy_trailer(&mut copy_in).await?;
648
649        // Finish the COPY operation
650        copy_in.finish().await.map_err(|e| {
651            // Log detailed information about the failed batch
652            tracing::error!("COPY operation failed for temp_pool_collect batch:");
653            tracing::error!("  Chain ID: {}", chain_id);
654            tracing::error!("  Batch size: {}", collects.len());
655
656            if !collects.is_empty() {
657                tracing::error!(
658                    "  Block range: {} to {}",
659                    collects.iter().map(|c| c.block).min().unwrap_or(0),
660                    collects.iter().map(|c| c.block).max().unwrap_or(0)
661                );
662            }
663
664            // Log first few collects with key details
665            for (i, collect) in collects.iter().take(5).enumerate() {
666                tracing::error!(
667                    "  Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
668                    i,
669                    collect.transaction_hash,
670                    collect.log_index,
671                    collect.block,
672                    collect.pool_address,
673                    collect.owner
674                );
675            }
676
677            if collects.len() > 5 {
678                tracing::error!("  ... and {} more collects", collects.len() - 5);
679            }
680
681            anyhow::anyhow!("Failed to finish COPY operation: {e}")
682        })?;
683
684        Ok(())
685    }
686
687    /// Writes a single pool fee collect in PostgreSQL binary format.
688    ///
689    /// Each row in binary format consists of:
690    /// - 2-byte field count
691    /// - For each field: 4-byte length followed by data (or -1 for NULL)
692    async fn write_pool_fee_collect_binary(
693        &self,
694        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
695        chain_id: u32,
696        collect: &PoolFeeCollect,
697    ) -> anyhow::Result<()> {
698        use std::io::Write;
699        let mut row_data = Vec::new();
700
701        // Number of fields (11)
702        row_data.write_all(&11u16.to_be_bytes())?;
703
704        // Field 1: chain_id (INT4)
705        let chain_id_bytes = (chain_id as i32).to_be_bytes();
706        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
707        row_data.write_all(&chain_id_bytes)?;
708
709        // Field 2: pool_address (TEXT)
710        let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
711        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
712        row_data.write_all(&pool_address_bytes)?;
713
714        // Field 3: block (INT8)
715        let block_bytes = (collect.block as i64).to_be_bytes();
716        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
717        row_data.write_all(&block_bytes)?;
718
719        // Field 4: transaction_hash (TEXT)
720        let tx_hash_bytes = collect.transaction_hash.as_bytes();
721        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
722        row_data.write_all(tx_hash_bytes)?;
723
724        // Field 5: transaction_index (INT4)
725        let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
726        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
727        row_data.write_all(&tx_index_bytes)?;
728
729        // Field 6: log_index (INT4)
730        let log_index_bytes = (collect.log_index as i32).to_be_bytes();
731        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
732        row_data.write_all(&log_index_bytes)?;
733
734        // Field 7: owner (TEXT)
735        let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
736        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
737        row_data.write_all(&owner_bytes)?;
738
739        // Field 8: amount0 (U256)
740        let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
741        row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
742        row_data.write_all(&fee0_bytes)?;
743
744        // Field 9: amount1 (U256)
745        let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
746        row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
747        row_data.write_all(&fee1_bytes)?;
748
749        // Field 10: tick_lower (INT4)
750        let tick_lower_bytes = collect.tick_lower.to_be_bytes();
751        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
752        row_data.write_all(&tick_lower_bytes)?;
753
754        // Field 11: tick_upper (INT4)
755        let tick_upper_bytes = collect.tick_upper.to_be_bytes();
756        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
757        row_data.write_all(&tick_upper_bytes)?;
758
759        copy_in
760            .send(row_data)
761            .await
762            .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
763        Ok(())
764    }
765
766    /// Writes the PostgreSQL COPY binary format trailer.
767    ///
768    /// The trailer is a 2-byte value of -1 to indicate end of data.
769    async fn write_copy_trailer(
770        &self,
771        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
772    ) -> anyhow::Result<()> {
773        // Binary trailer: -1 as i16 to indicate end of data
774        let trailer = (-1i16).to_be_bytes();
775        copy_in
776            .send(trailer.to_vec())
777            .await
778            .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
779        Ok(())
780    }
781}