nautilus_blockchain/cache/
copy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! PostgreSQL COPY BINARY operations for high-performance bulk data loading.
17//!
18//! This module provides utilities for using PostgreSQL's COPY command with binary format,
19//! which offers significantly better performance than standard INSERT operations for bulk data loading.
20
21use nautilus_model::defi::{Block, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect};
22use sqlx::{PgPool, postgres::PgPoolCopyExt};
23
24/// Handles PostgreSQL COPY BINARY operations for blockchain data.
25#[derive(Debug)]
26pub struct PostgresCopyHandler<'a> {
27    pool: &'a PgPool,
28}
29
30impl<'a> PostgresCopyHandler<'a> {
31    /// Creates a new COPY handler with a reference to the database pool.
32    #[must_use]
33    pub const fn new(pool: &'a PgPool) -> Self {
34        Self { pool }
35    }
36
37    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
38    ///
39    /// This method is significantly faster than INSERT for bulk operations as it bypasses
40    /// SQL parsing and uses PostgreSQL's native binary protocol.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the COPY operation fails.
45    pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
46        if blocks.is_empty() {
47            return Ok(());
48        }
49
50        let copy_statement = r"
51            COPY block (
52                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
53                base_fee_per_gas, blob_gas_used, excess_blob_gas,
54                l1_gas_price, l1_gas_used, l1_fee_scalar
55            ) FROM STDIN WITH (FORMAT BINARY)";
56
57        let mut copy_in = self
58            .pool
59            .copy_in_raw(copy_statement)
60            .await
61            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
62
63        // Write binary header
64        self.write_copy_header(&mut copy_in).await?;
65
66        // Write each block as binary data
67        for block in blocks {
68            self.write_block_binary(&mut copy_in, chain_id, block)
69                .await?;
70        }
71
72        // Write binary trailer
73        self.write_copy_trailer(&mut copy_in).await?;
74
75        // Finish the COPY operation
76        copy_in
77            .finish()
78            .await
79            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
80
81        Ok(())
82    }
83
84    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the COPY operation fails.
89    pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
90        if swaps.is_empty() {
91            return Ok(());
92        }
93
94        let copy_statement = r"
95            COPY pool_swap_event (
96                chain_id, pool_address, block, transaction_hash, transaction_index,
97                log_index, sender, side, size, price
98            ) FROM STDIN WITH (FORMAT BINARY)";
99
100        let mut copy_in = self
101            .pool
102            .copy_in_raw(copy_statement)
103            .await
104            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
105
106        // Write binary header
107        self.write_copy_header(&mut copy_in).await?;
108
109        // Write each swap as binary data
110        for swap in swaps {
111            self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
112                .await?;
113        }
114
115        // Write binary trailer
116        self.write_copy_trailer(&mut copy_in).await?;
117
118        // Finish the COPY operation
119        copy_in.finish().await.map_err(|e| {
120            // Log detailed information about the failed batch
121            tracing::error!("COPY operation failed for pool_swap batch:");
122            tracing::error!("  Chain ID: {}", chain_id);
123            tracing::error!("  Batch size: {}", swaps.len());
124
125            if !swaps.is_empty() {
126                tracing::error!(
127                    "  Block range: {} to {}",
128                    swaps.iter().map(|s| s.block).min().unwrap_or(0),
129                    swaps.iter().map(|s| s.block).max().unwrap_or(0)
130                );
131            }
132
133            // Log first few swaps with key details
134            for (i, swap) in swaps.iter().take(5).enumerate() {
135                tracing::error!(
136                    "  Swap[{}]: tx={} log_idx={} block={} pool={}",
137                    i,
138                    swap.transaction_hash,
139                    swap.log_index,
140                    swap.block,
141                    swap.pool_address
142                );
143            }
144
145            if swaps.len() > 5 {
146                tracing::error!("  ... and {} more swaps", swaps.len() - 5);
147            }
148
149            anyhow::anyhow!("Failed to finish COPY operation: {e}")
150        })?;
151
152        Ok(())
153    }
154
155    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the COPY operation fails.
160    pub async fn copy_pool_liquidity_updates(
161        &self,
162        chain_id: u32,
163        updates: &[PoolLiquidityUpdate],
164    ) -> anyhow::Result<()> {
165        if updates.is_empty() {
166            return Ok(());
167        }
168
169        let copy_statement = r"
170            COPY pool_liquidity_event (
171                chain_id, pool_address, block, transaction_hash, transaction_index,
172                log_index, event_type, sender, owner, position_liquidity,
173                amount0, amount1, tick_lower, tick_upper
174            ) FROM STDIN WITH (FORMAT BINARY)";
175
176        let mut copy_in = self
177            .pool
178            .copy_in_raw(copy_statement)
179            .await
180            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
181
182        // Write binary header
183        self.write_copy_header(&mut copy_in).await?;
184
185        // Write each liquidity update as binary data
186        for update in updates {
187            self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
188                .await?;
189        }
190
191        // Write binary trailer
192        self.write_copy_trailer(&mut copy_in).await?;
193
194        // Finish the COPY operation
195        copy_in.finish().await.map_err(|e| {
196            // Log detailed information about the failed batch
197            tracing::error!("COPY operation failed for pool_liquidity batch:");
198            tracing::error!("  Chain ID: {}", chain_id);
199            tracing::error!("  Batch size: {}", updates.len());
200
201            if !updates.is_empty() {
202                tracing::error!(
203                    "  Block range: {} to {}",
204                    updates.iter().map(|u| u.block).min().unwrap_or(0),
205                    updates.iter().map(|u| u.block).max().unwrap_or(0)
206                );
207            }
208
209            // Log first few liquidity updates with key details
210            for (i, update) in updates.iter().take(5).enumerate() {
211                tracing::error!(
212                    "  Update[{}]: tx={} log_idx={} block={} pool={} type={}",
213                    i,
214                    update.transaction_hash,
215                    update.log_index,
216                    update.block,
217                    update.pool_address,
218                    update.kind
219                );
220            }
221
222            if updates.len() > 5 {
223                tracing::error!("  ... and {} more updates", updates.len() - 5);
224            }
225
226            anyhow::anyhow!("Failed to finish COPY operation: {e}")
227        })?;
228
229        Ok(())
230    }
231
232    /// Writes the PostgreSQL COPY binary format header.
233    ///
234    /// The header consists of:
235    /// - 11-byte signature: "PGCOPY\n\xff\r\n\0"
236    /// - 4-byte flags field (all zeros)
237    /// - 4-byte header extension length (all zeros)
238    async fn write_copy_header(
239        &self,
240        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
241    ) -> anyhow::Result<()> {
242        use std::io::Write;
243        let mut header = Vec::new();
244
245        // PostgreSQL binary copy header
246        header.write_all(b"PGCOPY\n\xff\r\n\0").unwrap(); // Signature
247        header.write_all(&[0, 0, 0, 0]).unwrap(); // Flags field
248        header.write_all(&[0, 0, 0, 0]).unwrap(); // Header extension length
249
250        copy_in
251            .send(header)
252            .await
253            .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
254        Ok(())
255    }
256
257    /// Writes a single block in PostgreSQL binary format.
258    ///
259    /// Each row in binary format consists of:
260    /// - 2-byte field count
261    /// - For each field: 4-byte length followed by data (or -1 for NULL)
262    async fn write_block_binary(
263        &self,
264        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
265        chain_id: u32,
266        block: &Block,
267    ) -> anyhow::Result<()> {
268        use std::io::Write;
269        let mut row_data = Vec::new();
270
271        // Number of fields (14)
272        row_data.write_all(&14u16.to_be_bytes()).unwrap();
273
274        // Field 1: chain_id (INT4)
275        let chain_id_bytes = (chain_id as i32).to_be_bytes();
276        row_data
277            .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
278            .unwrap();
279        row_data.write_all(&chain_id_bytes).unwrap();
280
281        // Field 2: number (INT8)
282        let number_bytes = (block.number as i64).to_be_bytes();
283        row_data
284            .write_all(&(number_bytes.len() as i32).to_be_bytes())
285            .unwrap();
286        row_data.write_all(&number_bytes).unwrap();
287
288        // Field 3: hash (TEXT)
289        let hash_bytes = block.hash.as_bytes();
290        row_data
291            .write_all(&(hash_bytes.len() as i32).to_be_bytes())
292            .unwrap();
293        row_data.write_all(hash_bytes).unwrap();
294
295        // Field 4: parent_hash (TEXT)
296        let parent_hash_bytes = block.parent_hash.as_bytes();
297        row_data
298            .write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())
299            .unwrap();
300        row_data.write_all(parent_hash_bytes).unwrap();
301
302        // Field 5: miner (TEXT)
303        let miner_bytes = block.miner.to_string().as_bytes().to_vec();
304        row_data
305            .write_all(&(miner_bytes.len() as i32).to_be_bytes())
306            .unwrap();
307        row_data.write_all(&miner_bytes).unwrap();
308
309        // Field 6: gas_limit (INT8)
310        let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
311        row_data
312            .write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())
313            .unwrap();
314        row_data.write_all(&gas_limit_bytes).unwrap();
315
316        // Field 7: gas_used (INT8)
317        let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
318        row_data
319            .write_all(&(gas_used_bytes.len() as i32).to_be_bytes())
320            .unwrap();
321        row_data.write_all(&gas_used_bytes).unwrap();
322
323        // Field 8: timestamp (TEXT)
324        let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
325        row_data
326            .write_all(&(timestamp_bytes.len() as i32).to_be_bytes())
327            .unwrap();
328        row_data.write_all(&timestamp_bytes).unwrap();
329
330        // Field 9: base_fee_per_gas (TEXT, nullable)
331        if let Some(ref base_fee) = block.base_fee_per_gas {
332            let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
333            row_data
334                .write_all(&(base_fee_bytes.len() as i32).to_be_bytes())
335                .unwrap();
336            row_data.write_all(&base_fee_bytes).unwrap();
337        } else {
338            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
339        }
340
341        // Field 10: blob_gas_used (TEXT, nullable)
342        if let Some(ref blob_gas) = block.blob_gas_used {
343            let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
344            row_data
345                .write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())
346                .unwrap();
347            row_data.write_all(&blob_gas_bytes).unwrap();
348        } else {
349            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
350        }
351
352        // Field 11: excess_blob_gas (TEXT, nullable)
353        if let Some(ref excess_blob) = block.excess_blob_gas {
354            let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
355            row_data
356                .write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())
357                .unwrap();
358            row_data.write_all(&excess_blob_bytes).unwrap();
359        } else {
360            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
361        }
362
363        // Field 12: l1_gas_price (TEXT, nullable)
364        if let Some(ref l1_gas_price) = block.l1_gas_price {
365            let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
366            row_data
367                .write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())
368                .unwrap();
369            row_data.write_all(&l1_gas_price_bytes).unwrap();
370        } else {
371            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
372        }
373
374        // Field 13: l1_gas_used (INT8, nullable)
375        if let Some(l1_gas_used) = block.l1_gas_used {
376            let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
377            row_data
378                .write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())
379                .unwrap();
380            row_data.write_all(&l1_gas_used_bytes).unwrap();
381        } else {
382            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
383        }
384
385        // Field 14: l1_fee_scalar (INT8, nullable)
386        if let Some(l1_fee_scalar) = block.l1_fee_scalar {
387            let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
388            row_data
389                .write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())
390                .unwrap();
391            row_data.write_all(&l1_fee_scalar_bytes).unwrap();
392        } else {
393            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
394        }
395
396        copy_in
397            .send(row_data)
398            .await
399            .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
400        Ok(())
401    }
402
403    /// Writes a single pool swap in PostgreSQL binary format.
404    ///
405    /// Each row in binary format consists of:
406    /// - 2-byte field count
407    /// - For each field: 4-byte length followed by data (or -1 for NULL)
408    async fn write_pool_swap_binary(
409        &self,
410        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
411        chain_id: u32,
412        swap: &PoolSwap,
413    ) -> anyhow::Result<()> {
414        use std::io::Write;
415        let mut row_data = Vec::new();
416
417        // Number of fields (10)
418        row_data.write_all(&10u16.to_be_bytes()).unwrap();
419
420        // Field 1: chain_id (INT4)
421        let chain_id_bytes = (chain_id as i32).to_be_bytes();
422        row_data
423            .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
424            .unwrap();
425        row_data.write_all(&chain_id_bytes).unwrap();
426
427        // Field 2: pool_address (TEXT)
428        let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
429        row_data
430            .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
431            .unwrap();
432        row_data.write_all(&pool_address_bytes).unwrap();
433
434        // Field 3: block (INT8)
435        let block_bytes = (swap.block as i64).to_be_bytes();
436        row_data
437            .write_all(&(block_bytes.len() as i32).to_be_bytes())
438            .unwrap();
439        row_data.write_all(&block_bytes).unwrap();
440
441        // Field 4: transaction_hash (TEXT)
442        let tx_hash_bytes = swap.transaction_hash.as_bytes();
443        row_data
444            .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
445            .unwrap();
446        row_data.write_all(tx_hash_bytes).unwrap();
447
448        // Field 5: transaction_index (INT4)
449        let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
450        row_data
451            .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
452            .unwrap();
453        row_data.write_all(&tx_index_bytes).unwrap();
454
455        // Field 6: log_index (INT4)
456        let log_index_bytes = (swap.log_index as i32).to_be_bytes();
457        row_data
458            .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
459            .unwrap();
460        row_data.write_all(&log_index_bytes).unwrap();
461
462        // Field 7: sender (TEXT)
463        let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
464        row_data
465            .write_all(&(sender_bytes.len() as i32).to_be_bytes())
466            .unwrap();
467        row_data.write_all(&sender_bytes).unwrap();
468
469        // Field 8: side (TEXT)
470        let side_bytes = swap.side.to_string().as_bytes().to_vec();
471        row_data
472            .write_all(&(side_bytes.len() as i32).to_be_bytes())
473            .unwrap();
474        row_data.write_all(&side_bytes).unwrap();
475
476        // Field 9: size (TEXT)
477        let size_bytes = swap.size.to_string().as_bytes().to_vec();
478        row_data
479            .write_all(&(size_bytes.len() as i32).to_be_bytes())
480            .unwrap();
481        row_data.write_all(&size_bytes).unwrap();
482
483        // Field 10: price (TEXT)
484        let price_bytes = swap.price.to_string().as_bytes().to_vec();
485        row_data
486            .write_all(&(price_bytes.len() as i32).to_be_bytes())
487            .unwrap();
488        row_data.write_all(&price_bytes).unwrap();
489
490        copy_in
491            .send(row_data)
492            .await
493            .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
494        Ok(())
495    }
496
497    /// Writes a single pool liquidity update in PostgreSQL binary format.
498    ///
499    /// Each row in binary format consists of:
500    /// - 2-byte field count
501    /// - For each field: 4-byte length followed by data (or -1 for NULL)
502    async fn write_pool_liquidity_update_binary(
503        &self,
504        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
505        chain_id: u32,
506        update: &PoolLiquidityUpdate,
507    ) -> anyhow::Result<()> {
508        use std::io::Write;
509        let mut row_data = Vec::new();
510
511        // Number of fields (14)
512        row_data.write_all(&14u16.to_be_bytes()).unwrap();
513
514        // Field 1: chain_id (INT4)
515        let chain_id_bytes = (chain_id as i32).to_be_bytes();
516        row_data
517            .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
518            .unwrap();
519        row_data.write_all(&chain_id_bytes).unwrap();
520
521        // Field 2: pool_address (TEXT)
522        let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
523        row_data
524            .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
525            .unwrap();
526        row_data.write_all(&pool_address_bytes).unwrap();
527
528        // Field 3: block (INT8)
529        let block_bytes = (update.block as i64).to_be_bytes();
530        row_data
531            .write_all(&(block_bytes.len() as i32).to_be_bytes())
532            .unwrap();
533        row_data.write_all(&block_bytes).unwrap();
534
535        // Field 4: transaction_hash (TEXT)
536        let tx_hash_bytes = update.transaction_hash.as_bytes();
537        row_data
538            .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
539            .unwrap();
540        row_data.write_all(tx_hash_bytes).unwrap();
541
542        // Field 5: transaction_index (INT4)
543        let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
544        row_data
545            .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
546            .unwrap();
547        row_data.write_all(&tx_index_bytes).unwrap();
548
549        // Field 6: log_index (INT4)
550        let log_index_bytes = (update.log_index as i32).to_be_bytes();
551        row_data
552            .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
553            .unwrap();
554        row_data.write_all(&log_index_bytes).unwrap();
555
556        // Field 7: event_type (TEXT)
557        let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
558        row_data
559            .write_all(&(event_type_bytes.len() as i32).to_be_bytes())
560            .unwrap();
561        row_data.write_all(&event_type_bytes).unwrap();
562
563        // Field 8: sender (TEXT, nullable)
564        if let Some(sender) = update.sender {
565            let sender_bytes = sender.to_string().as_bytes().to_vec();
566            row_data
567                .write_all(&(sender_bytes.len() as i32).to_be_bytes())
568                .unwrap();
569            row_data.write_all(&sender_bytes).unwrap();
570        } else {
571            row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); // NULL value
572        }
573
574        // Field 9: owner (TEXT)
575        let owner_bytes = update.owner.to_string().as_bytes().to_vec();
576        row_data
577            .write_all(&(owner_bytes.len() as i32).to_be_bytes())
578            .unwrap();
579        row_data.write_all(&owner_bytes).unwrap();
580
581        // Field 10: position_liquidity (TEXT)
582        let position_liquidity_bytes = update.position_liquidity.to_string().as_bytes().to_vec();
583        row_data
584            .write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())
585            .unwrap();
586        row_data.write_all(&position_liquidity_bytes).unwrap();
587
588        // Field 11: amount0 (TEXT)
589        let amount0_bytes = update.amount0.to_string().as_bytes().to_vec();
590        row_data
591            .write_all(&(amount0_bytes.len() as i32).to_be_bytes())
592            .unwrap();
593        row_data.write_all(&amount0_bytes).unwrap();
594
595        // Field 12: amount1 (TEXT)
596        let amount1_bytes = update.amount1.to_string().as_bytes().to_vec();
597        row_data
598            .write_all(&(amount1_bytes.len() as i32).to_be_bytes())
599            .unwrap();
600        row_data.write_all(&amount1_bytes).unwrap();
601
602        // Field 13: tick_lower (INT4)
603        let tick_lower_bytes = update.tick_lower.to_be_bytes();
604        row_data
605            .write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())
606            .unwrap();
607        row_data.write_all(&tick_lower_bytes).unwrap();
608
609        // Field 14: tick_upper (INT4)
610        let tick_upper_bytes = update.tick_upper.to_be_bytes();
611        row_data
612            .write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())
613            .unwrap();
614        row_data.write_all(&tick_upper_bytes).unwrap();
615
616        copy_in
617            .send(row_data)
618            .await
619            .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
620        Ok(())
621    }
622
623    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the COPY operation fails.
628    pub async fn copy_pool_collects(
629        &self,
630        chain_id: u32,
631        collects: &[PoolFeeCollect],
632    ) -> anyhow::Result<()> {
633        if collects.is_empty() {
634            return Ok(());
635        }
636
637        let copy_statement = r"
638            COPY pool_collect_event (
639                chain_id, pool_address, block, transaction_hash, transaction_index,
640                log_index, owner, fee0, fee1, tick_lower, tick_upper
641            ) FROM STDIN WITH (FORMAT BINARY)";
642
643        let mut copy_in = self
644            .pool
645            .copy_in_raw(copy_statement)
646            .await
647            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
648
649        // Write binary header
650        self.write_copy_header(&mut copy_in).await?;
651
652        // Write each collect event as binary data
653        for collect in collects {
654            self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
655                .await?;
656        }
657
658        // Write binary trailer
659        self.write_copy_trailer(&mut copy_in).await?;
660
661        // Finish the COPY operation
662        copy_in.finish().await.map_err(|e| {
663            // Log detailed information about the failed batch
664            tracing::error!("COPY operation failed for pool_fee_collect batch:");
665            tracing::error!("  Chain ID: {}", chain_id);
666            tracing::error!("  Batch size: {}", collects.len());
667
668            if !collects.is_empty() {
669                tracing::error!(
670                    "  Block range: {} to {}",
671                    collects.iter().map(|c| c.block).min().unwrap_or(0),
672                    collects.iter().map(|c| c.block).max().unwrap_or(0)
673                );
674            }
675
676            // Log first few collects with key details
677            for (i, collect) in collects.iter().take(5).enumerate() {
678                tracing::error!(
679                    "  Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
680                    i,
681                    collect.transaction_hash,
682                    collect.log_index,
683                    collect.block,
684                    collect.pool_address,
685                    collect.owner
686                );
687            }
688
689            if collects.len() > 5 {
690                tracing::error!("  ... and {} more collects", collects.len() - 5);
691            }
692
693            anyhow::anyhow!("Failed to finish COPY operation: {e}")
694        })?;
695
696        Ok(())
697    }
698
699    /// Writes a single pool fee collect in PostgreSQL binary format.
700    ///
701    /// Each row in binary format consists of:
702    /// - 2-byte field count
703    /// - For each field: 4-byte length followed by data (or -1 for NULL)
704    async fn write_pool_fee_collect_binary(
705        &self,
706        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
707        chain_id: u32,
708        collect: &PoolFeeCollect,
709    ) -> anyhow::Result<()> {
710        use std::io::Write;
711        let mut row_data = Vec::new();
712
713        // Number of fields (11)
714        row_data.write_all(&11u16.to_be_bytes()).unwrap();
715
716        // Field 1: chain_id (INT4)
717        let chain_id_bytes = (chain_id as i32).to_be_bytes();
718        row_data
719            .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
720            .unwrap();
721        row_data.write_all(&chain_id_bytes).unwrap();
722
723        // Field 2: pool_address (TEXT)
724        let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
725        row_data
726            .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
727            .unwrap();
728        row_data.write_all(&pool_address_bytes).unwrap();
729
730        // Field 3: block (INT8)
731        let block_bytes = (collect.block as i64).to_be_bytes();
732        row_data
733            .write_all(&(block_bytes.len() as i32).to_be_bytes())
734            .unwrap();
735        row_data.write_all(&block_bytes).unwrap();
736
737        // Field 4: transaction_hash (TEXT)
738        let tx_hash_bytes = collect.transaction_hash.as_bytes();
739        row_data
740            .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
741            .unwrap();
742        row_data.write_all(tx_hash_bytes).unwrap();
743
744        // Field 5: transaction_index (INT4)
745        let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
746        row_data
747            .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
748            .unwrap();
749        row_data.write_all(&tx_index_bytes).unwrap();
750
751        // Field 6: log_index (INT4)
752        let log_index_bytes = (collect.log_index as i32).to_be_bytes();
753        row_data
754            .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
755            .unwrap();
756        row_data.write_all(&log_index_bytes).unwrap();
757
758        // Field 7: owner (TEXT)
759        let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
760        row_data
761            .write_all(&(owner_bytes.len() as i32).to_be_bytes())
762            .unwrap();
763        row_data.write_all(&owner_bytes).unwrap();
764
765        // Field 8: fee0 (TEXT)
766        let fee0_bytes = collect.fee0.to_string().as_bytes().to_vec();
767        row_data
768            .write_all(&(fee0_bytes.len() as i32).to_be_bytes())
769            .unwrap();
770        row_data.write_all(&fee0_bytes).unwrap();
771
772        // Field 9: fee1 (TEXT)
773        let fee1_bytes = collect.fee1.to_string().as_bytes().to_vec();
774        row_data
775            .write_all(&(fee1_bytes.len() as i32).to_be_bytes())
776            .unwrap();
777        row_data.write_all(&fee1_bytes).unwrap();
778
779        // Field 10: tick_lower (INT4)
780        let tick_lower_bytes = collect.tick_lower.to_be_bytes();
781        row_data
782            .write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())
783            .unwrap();
784        row_data.write_all(&tick_lower_bytes).unwrap();
785
786        // Field 11: tick_upper (INT4)
787        let tick_upper_bytes = collect.tick_upper.to_be_bytes();
788        row_data
789            .write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())
790            .unwrap();
791        row_data.write_all(&tick_upper_bytes).unwrap();
792
793        copy_in
794            .send(row_data)
795            .await
796            .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
797        Ok(())
798    }
799
800    /// Writes the PostgreSQL COPY binary format trailer.
801    ///
802    /// The trailer is a 2-byte value of -1 to indicate end of data.
803    async fn write_copy_trailer(
804        &self,
805        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
806    ) -> anyhow::Result<()> {
807        // Binary trailer: -1 as i16 to indicate end of data
808        let trailer = (-1i16).to_be_bytes();
809        copy_in
810            .send(trailer.to_vec())
811            .await
812            .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
813        Ok(())
814    }
815}