nautilus_blockchain/cache/
copy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! PostgreSQL COPY BINARY operations for high-performance bulk data loading.
17//!
18//! This module provides utilities for using PostgreSQL's COPY command with binary format,
19//! which offers significantly better performance than standard INSERT operations for bulk data loading.
20
21use nautilus_model::defi::{
22    Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26// Helper to convert scientific notation to decimal
27fn format_scientific_to_decimal(s: &str) -> String {
28    use std::str::FromStr;
29
30    use rust_decimal::Decimal;
31
32    match Decimal::from_str(s) {
33        Ok(decimal) => decimal.to_string(),
34        Err(_) => s.to_string(), // Fallback
35    }
36}
37
38/// Formats a numeric value for PostgreSQL NUMERIC type
39fn format_numeric<T: ToString>(value: &T) -> String {
40    let s = value.to_string();
41
42    // Remove any '+' prefix
43    let s = s.trim_start_matches('+');
44
45    // Handle scientific notation by converting to decimal
46    if s.contains('e') || s.contains('E') {
47        return format_scientific_to_decimal(s);
48    }
49
50    // For very large numbers that rust_decimal can't handle,
51    // just return the cleaned string since alloy_primitives already
52    // produces clean decimal format
53    s.to_string()
54}
55
56/// Handles PostgreSQL COPY BINARY operations for blockchain data.
57#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59    pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63    /// Creates a new COPY handler with a reference to the database pool.
64    #[must_use]
65    pub const fn new(pool: &'a PgPool) -> Self {
66        Self { pool }
67    }
68
69    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
70    ///
71    /// This method is significantly faster than INSERT for bulk operations as it bypasses
72    /// SQL parsing and uses PostgreSQL's native binary protocol.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the COPY operation fails.
77    pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78        if blocks.is_empty() {
79            return Ok(());
80        }
81
82        let copy_statement = r"
83            COPY block (
84                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85                base_fee_per_gas, blob_gas_used, excess_blob_gas,
86                l1_gas_price, l1_gas_used, l1_fee_scalar
87            ) FROM STDIN WITH (FORMAT BINARY)";
88
89        let mut copy_in = self
90            .pool
91            .copy_in_raw(copy_statement)
92            .await
93            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95        // Write binary header
96        self.write_copy_header(&mut copy_in).await?;
97
98        // Write each block as binary data
99        for block in blocks {
100            self.write_block_binary(&mut copy_in, chain_id, block)
101                .await?;
102        }
103
104        // Write binary trailer
105        self.write_copy_trailer(&mut copy_in).await?;
106
107        // Finish the COPY operation
108        copy_in
109            .finish()
110            .await
111            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113        Ok(())
114    }
115
116    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the COPY operation fails.
121    pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122        if tokens.is_empty() {
123            return Ok(());
124        }
125
126        let copy_statement = r"
127            COPY token (
128                chain_id, address, name, symbol, decimals
129            ) FROM STDIN WITH (FORMAT BINARY)";
130
131        let mut copy_in = self
132            .pool
133            .copy_in_raw(copy_statement)
134            .await
135            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137        self.write_copy_header(&mut copy_in).await?;
138        for token in tokens {
139            self.write_token_binary(&mut copy_in, chain_id, token)
140                .await?;
141        }
142        self.write_copy_trailer(&mut copy_in).await?;
143        copy_in
144            .finish()
145            .await
146            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148        Ok(())
149    }
150
151    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the COPY operation fails.
156    pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157        if pools.is_empty() {
158            return Ok(());
159        }
160
161        let copy_statement = r"
162            COPY pool (
163                chain_id, dex_name, address, creation_block,
164                token0_chain, token0_address, token1_chain, token1_address,
165                fee, tick_spacing, initial_tick, initial_sqrt_price_x96
166            ) FROM STDIN WITH (FORMAT BINARY)";
167
168        let mut copy_in = self
169            .pool
170            .copy_in_raw(copy_statement)
171            .await
172            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174        self.write_copy_header(&mut copy_in).await?;
175        for pool in pools {
176            self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177        }
178        self.write_copy_trailer(&mut copy_in).await?;
179        copy_in
180            .finish()
181            .await
182            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184        Ok(())
185    }
186
187    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the COPY operation fails.
192    pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193        if swaps.is_empty() {
194            return Ok(());
195        }
196
197        let copy_statement = r"
198            COPY pool_swap_event (
199                chain_id, pool_address, block, transaction_hash, transaction_index,
200                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
201                order_side, base_quantity, quote_quantity, spot_price, execution_price
202            ) FROM STDIN WITH (FORMAT BINARY)";
203
204        let mut copy_in = self
205            .pool
206            .copy_in_raw(copy_statement)
207            .await
208            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
209
210        // Write binary header
211        self.write_copy_header(&mut copy_in).await?;
212
213        // Write each swap as binary data
214        for swap in swaps {
215            self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
216                .await?;
217        }
218
219        // Write binary trailer
220        self.write_copy_trailer(&mut copy_in).await?;
221
222        // Finish the COPY operation
223        copy_in.finish().await.map_err(|e| {
224            // Log detailed information about the failed batch
225            tracing::error!("COPY operation failed for pool_swap batch:");
226            tracing::error!("  Chain ID: {}", chain_id);
227            tracing::error!("  Batch size: {}", swaps.len());
228
229            if !swaps.is_empty() {
230                tracing::error!(
231                    "  Block range: {} to {}",
232                    swaps.iter().map(|s| s.block).min().unwrap_or(0),
233                    swaps.iter().map(|s| s.block).max().unwrap_or(0)
234                );
235            }
236
237            // Log first few swaps with key details
238            for (i, swap) in swaps.iter().take(5).enumerate() {
239                tracing::error!(
240                    "  Swap[{}]: tx={} log_idx={} block={} pool={}",
241                    i,
242                    swap.transaction_hash,
243                    swap.log_index,
244                    swap.block,
245                    swap.pool_address
246                );
247            }
248
249            if swaps.len() > 5 {
250                tracing::error!("  ... and {} more swaps", swaps.len() - 5);
251            }
252
253            anyhow::anyhow!("Failed to finish COPY operation: {e}")
254        })?;
255
256        Ok(())
257    }
258
259    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the COPY operation fails.
264    pub async fn copy_pool_liquidity_updates(
265        &self,
266        chain_id: u32,
267        updates: &[PoolLiquidityUpdate],
268    ) -> anyhow::Result<()> {
269        if updates.is_empty() {
270            return Ok(());
271        }
272
273        let copy_statement = r"
274            COPY pool_liquidity_event (
275                chain_id, pool_address, block, transaction_hash, transaction_index,
276                log_index, event_type, sender, owner, position_liquidity,
277                amount0, amount1, tick_lower, tick_upper
278            ) FROM STDIN WITH (FORMAT BINARY)";
279
280        let mut copy_in = self
281            .pool
282            .copy_in_raw(copy_statement)
283            .await
284            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
285
286        // Write binary header
287        self.write_copy_header(&mut copy_in).await?;
288
289        // Write each liquidity update as binary data
290        for update in updates {
291            self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
292                .await?;
293        }
294
295        // Write binary trailer
296        self.write_copy_trailer(&mut copy_in).await?;
297
298        // Finish the COPY operation
299        copy_in.finish().await.map_err(|e| {
300            // Log detailed information about the failed batch
301            tracing::error!("COPY operation failed for pool_liquidity batch:");
302            tracing::error!("  Chain ID: {}", chain_id);
303            tracing::error!("  Batch size: {}", updates.len());
304
305            if !updates.is_empty() {
306                tracing::error!(
307                    "  Block range: {} to {}",
308                    updates.iter().map(|u| u.block).min().unwrap_or(0),
309                    updates.iter().map(|u| u.block).max().unwrap_or(0)
310                );
311            }
312
313            // Log first few liquidity updates with key details
314            for (i, update) in updates.iter().take(5).enumerate() {
315                tracing::error!(
316                    "  Update[{}]: tx={} log_idx={} block={} pool={} type={}",
317                    i,
318                    update.transaction_hash,
319                    update.log_index,
320                    update.block,
321                    update.pool_address,
322                    update.kind
323                );
324            }
325
326            if updates.len() > 5 {
327                tracing::error!("  ... and {} more updates", updates.len() - 5);
328            }
329
330            anyhow::anyhow!("Failed to finish COPY operation: {e}")
331        })?;
332
333        Ok(())
334    }
335
336    /// Writes the PostgreSQL COPY binary format header.
337    ///
338    /// The header consists of:
339    /// - 11-byte signature: "PGCOPY\n\xff\r\n\0"
340    /// - 4-byte flags field (all zeros)
341    /// - 4-byte header extension length (all zeros)
342    async fn write_copy_header(
343        &self,
344        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
345    ) -> anyhow::Result<()> {
346        use std::io::Write;
347        let mut header = Vec::new();
348
349        // PostgreSQL binary copy header
350        header.write_all(b"PGCOPY\n\xff\r\n\0")?; // Signature
351        header.write_all(&[0, 0, 0, 0])?; // Flags field
352        header.write_all(&[0, 0, 0, 0])?; // Header extension length
353
354        copy_in
355            .send(header)
356            .await
357            .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
358        Ok(())
359    }
360
361    /// Writes a single block in PostgreSQL binary format.
362    ///
363    /// Each row in binary format consists of:
364    /// - 2-byte field count
365    /// - For each field: 4-byte length followed by data (or -1 for NULL)
366    async fn write_block_binary(
367        &self,
368        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
369        chain_id: u32,
370        block: &Block,
371    ) -> anyhow::Result<()> {
372        use std::io::Write;
373        let mut row_data = Vec::new();
374
375        // Number of fields (14)
376        row_data.write_all(&14u16.to_be_bytes())?;
377
378        // Field 1: chain_id (INT4)
379        let chain_id_bytes = (chain_id as i32).to_be_bytes();
380        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
381        row_data.write_all(&chain_id_bytes)?;
382
383        // Field 2: number (INT8)
384        let number_bytes = (block.number as i64).to_be_bytes();
385        row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
386        row_data.write_all(&number_bytes)?;
387
388        // Field 3: hash (TEXT)
389        let hash_bytes = block.hash.as_bytes();
390        row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
391        row_data.write_all(hash_bytes)?;
392
393        // Field 4: parent_hash (TEXT)
394        let parent_hash_bytes = block.parent_hash.as_bytes();
395        row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
396        row_data.write_all(parent_hash_bytes)?;
397
398        // Field 5: miner (TEXT)
399        let miner_bytes = block.miner.to_string().as_bytes().to_vec();
400        row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
401        row_data.write_all(&miner_bytes)?;
402
403        // Field 6: gas_limit (INT8)
404        let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
405        row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
406        row_data.write_all(&gas_limit_bytes)?;
407
408        // Field 7: gas_used (INT8)
409        let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
410        row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
411        row_data.write_all(&gas_used_bytes)?;
412
413        // Field 8: timestamp (TEXT)
414        let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
415        row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
416        row_data.write_all(&timestamp_bytes)?;
417
418        // Field 9: base_fee_per_gas (TEXT, nullable)
419        if let Some(ref base_fee) = block.base_fee_per_gas {
420            let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
421            row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
422            row_data.write_all(&base_fee_bytes)?;
423        } else {
424            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
425        }
426
427        // Field 10: blob_gas_used (TEXT, nullable)
428        if let Some(ref blob_gas) = block.blob_gas_used {
429            let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
430            row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
431            row_data.write_all(&blob_gas_bytes)?;
432        } else {
433            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
434        }
435
436        // Field 11: excess_blob_gas (TEXT, nullable)
437        if let Some(ref excess_blob) = block.excess_blob_gas {
438            let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
439            row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
440            row_data.write_all(&excess_blob_bytes)?;
441        } else {
442            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
443        }
444
445        // Field 12: l1_gas_price (TEXT, nullable)
446        if let Some(ref l1_gas_price) = block.l1_gas_price {
447            let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
448            row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
449            row_data.write_all(&l1_gas_price_bytes)?;
450        } else {
451            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
452        }
453
454        // Field 13: l1_gas_used (INT8, nullable)
455        if let Some(l1_gas_used) = block.l1_gas_used {
456            let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
457            row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
458            row_data.write_all(&l1_gas_used_bytes)?;
459        } else {
460            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
461        }
462
463        // Field 14: l1_fee_scalar (INT8, nullable)
464        if let Some(l1_fee_scalar) = block.l1_fee_scalar {
465            let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
466            row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
467            row_data.write_all(&l1_fee_scalar_bytes)?;
468        } else {
469            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
470        }
471
472        copy_in
473            .send(row_data)
474            .await
475            .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
476        Ok(())
477    }
478
479    /// Writes a single pool swap in PostgreSQL binary format.
480    ///
481    /// Each row in binary format consists of:
482    /// - 2-byte field count
483    /// - For each field: 4-byte length followed by data (or -1 for NULL)
484    async fn write_pool_swap_binary(
485        &self,
486        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
487        chain_id: u32,
488        swap: &PoolSwap,
489    ) -> anyhow::Result<()> {
490        use std::io::Write;
491        let mut row_data = Vec::new();
492
493        // Number of fields (18): chain_id, pool_address, block, transaction_hash, transaction_index,
494        // log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
495        // order_side, base_quantity, quote_quantity, spot_price, execution_price
496        row_data.write_all(&18u16.to_be_bytes())?;
497
498        // chain_id (INT4)
499        let chain_id_bytes = (chain_id as i32).to_be_bytes();
500        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
501        row_data.write_all(&chain_id_bytes)?;
502
503        // pool_address (TEXT)
504        let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
505        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
506        row_data.write_all(&pool_address_bytes)?;
507
508        // block (INT8)
509        let block_bytes = (swap.block as i64).to_be_bytes();
510        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
511        row_data.write_all(&block_bytes)?;
512
513        // transaction_hash (TEXT)
514        let tx_hash_bytes = swap.transaction_hash.as_bytes();
515        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
516        row_data.write_all(tx_hash_bytes)?;
517
518        // transaction_index (INT4)
519        let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
520        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
521        row_data.write_all(&tx_index_bytes)?;
522
523        // log_index (INT4)
524        let log_index_bytes = (swap.log_index as i32).to_be_bytes();
525        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
526        row_data.write_all(&log_index_bytes)?;
527
528        // sender (TEXT)
529        let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
530        row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
531        row_data.write_all(&sender_bytes)?;
532
533        // recipient (TEXT)
534        let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
535        row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
536        row_data.write_all(&recipient_bytes)?;
537
538        // sqrt_price_x96 (U160)
539        let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
540        row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
541        row_data.write_all(&sqrt_price_bytes)?;
542
543        // liquidity (U128)
544        let liquidity_bytes = format_numeric(&swap.liquidity).as_bytes().to_vec();
545        row_data.write_all(&(liquidity_bytes.len() as i32).to_be_bytes())?;
546        row_data.write_all(&liquidity_bytes)?;
547
548        // tick (INT4)
549        let tick_bytes = swap.tick.to_be_bytes();
550        row_data.write_all(&(tick_bytes.len() as i32).to_be_bytes())?;
551        row_data.write_all(&tick_bytes)?;
552
553        // amount0 (I256)
554        let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
555        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
556        row_data.write_all(&amount0_bytes)?;
557
558        // amount1 (I256)
559        let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
560        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
561        row_data.write_all(&amount1_bytes)?;
562
563        // New fields from trade_info: order_side, base_quantity, quote_quantity, spot_price, execution_price
564        if let Some(trade_info) = &swap.trade_info {
565            // order_side (TEXT)
566            let side_bytes = trade_info.order_side.to_string().as_bytes().to_vec();
567            row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
568            row_data.write_all(&side_bytes)?;
569
570            // base_quantity (NUMERIC) - convert Decimal to string representation
571            let base_qty_decimal = trade_info.quantity_base.as_decimal();
572            let base_qty_str = base_qty_decimal.to_string();
573            let base_qty_bytes = base_qty_str.as_bytes();
574            row_data.write_all(&(base_qty_bytes.len() as i32).to_be_bytes())?;
575            row_data.write_all(base_qty_bytes)?;
576
577            // quote_quantity (NUMERIC) - convert Decimal to string representation
578            let quote_qty_decimal = trade_info.quantity_quote.as_decimal();
579            let quote_qty_str = quote_qty_decimal.to_string();
580            let quote_qty_bytes = quote_qty_str.as_bytes();
581            row_data.write_all(&(quote_qty_bytes.len() as i32).to_be_bytes())?;
582            row_data.write_all(quote_qty_bytes)?;
583
584            // spot_price (NUMERIC) - convert Decimal to string representation
585            let spot_price_decimal = trade_info.spot_price.as_decimal();
586            let spot_price_str = spot_price_decimal.to_string();
587            let spot_price_bytes = spot_price_str.as_bytes();
588            row_data.write_all(&(spot_price_bytes.len() as i32).to_be_bytes())?;
589            row_data.write_all(spot_price_bytes)?;
590
591            // execution_price (NUMERIC) - convert Decimal to string representation
592            let exec_price_decimal = trade_info.execution_price.as_decimal();
593            let exec_price_str = exec_price_decimal.to_string();
594            let exec_price_bytes = exec_price_str.as_bytes();
595            row_data.write_all(&(exec_price_bytes.len() as i32).to_be_bytes())?;
596            row_data.write_all(exec_price_bytes)?;
597        } else {
598            // All 5 fields are NULL when trade_info is not available
599            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL for order_side
600            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL for base_quantity
601            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL for quote_quantity
602            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL for spot_price
603            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL for execution_price
604        }
605
606        copy_in
607            .send(row_data)
608            .await
609            .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
610        Ok(())
611    }
612
613    /// Writes a single pool liquidity update in PostgreSQL binary format.
614    ///
615    /// Each row in binary format consists of:
616    /// - 2-byte field count
617    /// - For each field: 4-byte length followed by data (or -1 for NULL)
618    async fn write_pool_liquidity_update_binary(
619        &self,
620        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
621        chain_id: u32,
622        update: &PoolLiquidityUpdate,
623    ) -> anyhow::Result<()> {
624        use std::io::Write;
625        let mut row_data = Vec::new();
626
627        // Number of fields (14)
628        row_data.write_all(&14u16.to_be_bytes())?;
629
630        // Field 1: chain_id (INT4)
631        let chain_id_bytes = (chain_id as i32).to_be_bytes();
632        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
633        row_data.write_all(&chain_id_bytes)?;
634
635        // Field 2: pool_address (TEXT)
636        let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
637        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
638        row_data.write_all(&pool_address_bytes)?;
639
640        // Field 3: block (INT8)
641        let block_bytes = (update.block as i64).to_be_bytes();
642        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
643        row_data.write_all(&block_bytes)?;
644
645        // Field 4: transaction_hash (TEXT)
646        let tx_hash_bytes = update.transaction_hash.as_bytes();
647        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
648        row_data.write_all(tx_hash_bytes)?;
649
650        // Field 5: transaction_index (INT4)
651        let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
652        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
653        row_data.write_all(&tx_index_bytes)?;
654
655        // Field 6: log_index (INT4)
656        let log_index_bytes = (update.log_index as i32).to_be_bytes();
657        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
658        row_data.write_all(&log_index_bytes)?;
659
660        // Field 7: event_type (TEXT)
661        let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
662        row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
663        row_data.write_all(&event_type_bytes)?;
664
665        // Field 8: sender (TEXT, nullable)
666        if let Some(sender) = update.sender {
667            let sender_bytes = sender.to_string().as_bytes().to_vec();
668            row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
669            row_data.write_all(&sender_bytes)?;
670        } else {
671            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
672        }
673
674        // Field 9: owner (TEXT)
675        let owner_bytes = update.owner.to_string().as_bytes().to_vec();
676        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
677        row_data.write_all(&owner_bytes)?;
678
679        // Field 10: position_liquidity (U128)
680        let position_liquidity_bytes = format_numeric(&update.position_liquidity)
681            .as_bytes()
682            .to_vec();
683        row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
684        row_data.write_all(&position_liquidity_bytes)?;
685
686        // Field 11: amount0 (U256)
687        let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
688        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
689        row_data.write_all(&amount0_bytes)?;
690
691        // Field 12: amount1 (U256)
692        let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
693        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
694        row_data.write_all(&amount1_bytes)?;
695
696        // Field 13: tick_lower (INT4)
697        let tick_lower_bytes = update.tick_lower.to_be_bytes();
698        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
699        row_data.write_all(&tick_lower_bytes)?;
700
701        // Field 14: tick_upper (INT4)
702        let tick_upper_bytes = update.tick_upper.to_be_bytes();
703        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
704        row_data.write_all(&tick_upper_bytes)?;
705
706        copy_in
707            .send(row_data)
708            .await
709            .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
710        Ok(())
711    }
712
713    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
714    ///
715    /// # Errors
716    ///
717    /// Returns an error if the COPY operation fails.
718    pub async fn copy_pool_collects(
719        &self,
720        chain_id: u32,
721        collects: &[PoolFeeCollect],
722    ) -> anyhow::Result<()> {
723        if collects.is_empty() {
724            return Ok(());
725        }
726
727        let copy_statement = r"
728            COPY pool_collect_event (
729                chain_id, pool_address, block, transaction_hash, transaction_index,
730                log_index, owner, amount0, amount1, tick_lower, tick_upper
731            ) FROM STDIN WITH (FORMAT BINARY)";
732
733        let mut copy_in = self
734            .pool
735            .copy_in_raw(copy_statement)
736            .await
737            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
738
739        // Write binary header
740        self.write_copy_header(&mut copy_in).await?;
741
742        // Write each collect event as binary data
743        for collect in collects {
744            self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
745                .await?;
746        }
747
748        // Write binary trailer
749        self.write_copy_trailer(&mut copy_in).await?;
750
751        // Finish the COPY operation
752        copy_in.finish().await.map_err(|e| {
753            // Log detailed information about the failed batch
754            tracing::error!("COPY operation failed for temp_pool_collect batch:");
755            tracing::error!("  Chain ID: {}", chain_id);
756            tracing::error!("  Batch size: {}", collects.len());
757
758            if !collects.is_empty() {
759                tracing::error!(
760                    "  Block range: {} to {}",
761                    collects.iter().map(|c| c.block).min().unwrap_or(0),
762                    collects.iter().map(|c| c.block).max().unwrap_or(0)
763                );
764            }
765
766            // Log first few collects with key details
767            for (i, collect) in collects.iter().take(5).enumerate() {
768                tracing::error!(
769                    "  Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
770                    i,
771                    collect.transaction_hash,
772                    collect.log_index,
773                    collect.block,
774                    collect.pool_address,
775                    collect.owner
776                );
777            }
778
779            if collects.len() > 5 {
780                tracing::error!("  ... and {} more collects", collects.len() - 5);
781            }
782
783            anyhow::anyhow!("Failed to finish COPY operation: {e}")
784        })?;
785
786        Ok(())
787    }
788
789    /// Writes a single pool fee collect in PostgreSQL binary format.
790    ///
791    /// Each row in binary format consists of:
792    /// - 2-byte field count
793    /// - For each field: 4-byte length followed by data (or -1 for NULL)
794    async fn write_pool_fee_collect_binary(
795        &self,
796        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
797        chain_id: u32,
798        collect: &PoolFeeCollect,
799    ) -> anyhow::Result<()> {
800        use std::io::Write;
801        let mut row_data = Vec::new();
802
803        // Number of fields (11)
804        row_data.write_all(&11u16.to_be_bytes())?;
805
806        // Field 1: chain_id (INT4)
807        let chain_id_bytes = (chain_id as i32).to_be_bytes();
808        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
809        row_data.write_all(&chain_id_bytes)?;
810
811        // Field 2: pool_address (TEXT)
812        let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
813        row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
814        row_data.write_all(&pool_address_bytes)?;
815
816        // Field 3: block (INT8)
817        let block_bytes = (collect.block as i64).to_be_bytes();
818        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
819        row_data.write_all(&block_bytes)?;
820
821        // Field 4: transaction_hash (TEXT)
822        let tx_hash_bytes = collect.transaction_hash.as_bytes();
823        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
824        row_data.write_all(tx_hash_bytes)?;
825
826        // Field 5: transaction_index (INT4)
827        let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
828        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
829        row_data.write_all(&tx_index_bytes)?;
830
831        // Field 6: log_index (INT4)
832        let log_index_bytes = (collect.log_index as i32).to_be_bytes();
833        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
834        row_data.write_all(&log_index_bytes)?;
835
836        // Field 7: owner (TEXT)
837        let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
838        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
839        row_data.write_all(&owner_bytes)?;
840
841        // Field 8: amount0 (U256)
842        let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
843        row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
844        row_data.write_all(&fee0_bytes)?;
845
846        // Field 9: amount1 (U256)
847        let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
848        row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
849        row_data.write_all(&fee1_bytes)?;
850
851        // Field 10: tick_lower (INT4)
852        let tick_lower_bytes = collect.tick_lower.to_be_bytes();
853        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
854        row_data.write_all(&tick_lower_bytes)?;
855
856        // Field 11: tick_upper (INT4)
857        let tick_upper_bytes = collect.tick_upper.to_be_bytes();
858        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
859        row_data.write_all(&tick_upper_bytes)?;
860
861        copy_in
862            .send(row_data)
863            .await
864            .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
865        Ok(())
866    }
867
868    /// Writes a single token in PostgreSQL binary format.
869    ///
870    /// Each row in binary format consists of:
871    /// - 2-byte field count
872    /// - For each field: 4-byte length followed by data (or -1 for NULL)
873    async fn write_token_binary(
874        &self,
875        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
876        chain_id: u32,
877        token: &Token,
878    ) -> anyhow::Result<()> {
879        use std::io::Write;
880        let mut row_data = Vec::new();
881
882        // Number of fields (5)
883        row_data.write_all(&5u16.to_be_bytes())?;
884
885        // Field 1: chain_id (INT4)
886        let chain_id_bytes = (chain_id as i32).to_be_bytes();
887        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
888        row_data.write_all(&chain_id_bytes)?;
889
890        // Field 2: address (TEXT)
891        let address_bytes = token.address.to_string().as_bytes().to_vec();
892        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
893        row_data.write_all(&address_bytes)?;
894
895        // Field 3: name (TEXT)
896        let name_bytes = token.name.as_bytes();
897        row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
898        row_data.write_all(name_bytes)?;
899
900        // Field 4: symbol (TEXT)
901        let symbol_bytes = token.symbol.as_bytes();
902        row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
903        row_data.write_all(symbol_bytes)?;
904
905        // Field 5: decimals (INT4)
906        let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
907        row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
908        row_data.write_all(&decimals_bytes)?;
909
910        copy_in
911            .send(row_data)
912            .await
913            .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
914        Ok(())
915    }
916
917    /// Writes a single pool in PostgreSQL binary format.
918    ///
919    /// Each row in binary format consists of:
920    /// - 2-byte field count
921    /// - For each field: 4-byte length followed by data (or -1 for NULL)
922    async fn write_pool_binary(
923        &self,
924        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
925        chain_id: u32,
926        pool: &Pool,
927    ) -> anyhow::Result<()> {
928        use std::io::Write;
929        let mut row_data = Vec::new();
930
931        // Number of fields (12)
932        row_data.write_all(&12u16.to_be_bytes())?;
933
934        // Field 1: chain_id (INT4)
935        let chain_id_bytes = (chain_id as i32).to_be_bytes();
936        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
937        row_data.write_all(&chain_id_bytes)?;
938
939        // Field 2: dex_name (TEXT)
940        let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
941        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
942        row_data.write_all(&dex_name_bytes)?;
943
944        // Field 3: address (TEXT)
945        let address_bytes = pool.address.to_string().as_bytes().to_vec();
946        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
947        row_data.write_all(&address_bytes)?;
948
949        // Field 4: creation_block (INT8)
950        let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
951        row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
952        row_data.write_all(&creation_block_bytes)?;
953
954        // Field 5: token0_chain (INT4)
955        let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
956        row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
957        row_data.write_all(&token0_chain_bytes)?;
958
959        // Field 6: token0_address (TEXT)
960        let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
961        row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
962        row_data.write_all(&token0_address_bytes)?;
963
964        // Field 7: token1_chain (INT4)
965        let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
966        row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
967        row_data.write_all(&token1_chain_bytes)?;
968
969        // Field 8: token1_address (TEXT)
970        let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
971        row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
972        row_data.write_all(&token1_address_bytes)?;
973
974        // Field 9: fee (INT4, nullable)
975        if let Some(fee) = pool.fee {
976            let fee_bytes = (fee as i32).to_be_bytes();
977            row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
978            row_data.write_all(&fee_bytes)?;
979        } else {
980            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
981        }
982
983        // Field 10: tick_spacing (INT4, nullable)
984        if let Some(tick_spacing) = pool.tick_spacing {
985            let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
986            row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
987            row_data.write_all(&tick_spacing_bytes)?;
988        } else {
989            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
990        }
991
992        // Field 11: initial_tick (INT4, nullable)
993        if let Some(initial_tick) = pool.initial_tick {
994            let initial_tick_bytes = initial_tick.to_be_bytes();
995            row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
996            row_data.write_all(&initial_tick_bytes)?;
997        } else {
998            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
999        }
1000
1001        // Field 12: initial_sqrt_price_x96 (TEXT, nullable)
1002        if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
1003            let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
1004            row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
1005            row_data.write_all(&sqrt_price_bytes)?;
1006        } else {
1007            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
1008        }
1009
1010        copy_in
1011            .send(row_data)
1012            .await
1013            .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
1014        Ok(())
1015    }
1016
1017    /// Writes the PostgreSQL COPY binary format trailer.
1018    ///
1019    /// The trailer is a 2-byte value of -1 to indicate end of data.
1020    async fn write_copy_trailer(
1021        &self,
1022        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
1023    ) -> anyhow::Result<()> {
1024        // Binary trailer: -1 as i16 to indicate end of data
1025        let trailer = (-1i16).to_be_bytes();
1026        copy_in
1027            .send(trailer.to_vec())
1028            .await
1029            .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
1030        Ok(())
1031    }
1032}