nautilus_blockchain/cache/
copy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! PostgreSQL COPY BINARY operations for high-performance bulk data loading.
17//!
18//! This module provides utilities for using PostgreSQL's COPY command with binary format,
19//! which offers significantly better performance than standard INSERT operations for bulk data loading.
20
21use nautilus_model::defi::{
22    Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26// Helper to convert scientific notation to decimal
27fn format_scientific_to_decimal(s: &str) -> String {
28    use std::str::FromStr;
29
30    use rust_decimal::Decimal;
31
32    match Decimal::from_str(s) {
33        Ok(decimal) => decimal.to_string(),
34        Err(_) => s.to_string(), // Fallback
35    }
36}
37
38/// Formats a numeric value for PostgreSQL NUMERIC type
39fn format_numeric<T: ToString>(value: &T) -> String {
40    let s = value.to_string();
41
42    // Remove any '+' prefix
43    let s = s.trim_start_matches('+');
44
45    // Handle scientific notation by converting to decimal
46    if s.contains('e') || s.contains('E') {
47        return format_scientific_to_decimal(s);
48    }
49
50    // For very large numbers that rust_decimal can't handle,
51    // just return the cleaned string since alloy_primitives already
52    // produces clean decimal format
53    s.to_string()
54}
55
56/// Handles PostgreSQL COPY BINARY operations for blockchain data.
57#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59    pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63    /// Creates a new COPY handler with a reference to the database pool.
64    #[must_use]
65    pub const fn new(pool: &'a PgPool) -> Self {
66        Self { pool }
67    }
68
69    /// Inserts blocks using PostgreSQL COPY BINARY for maximum performance.
70    ///
71    /// This method is significantly faster than INSERT for bulk operations as it bypasses
72    /// SQL parsing and uses PostgreSQL's native binary protocol.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the COPY operation fails.
77    pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78        if blocks.is_empty() {
79            return Ok(());
80        }
81
82        let copy_statement = r"
83            COPY block (
84                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85                base_fee_per_gas, blob_gas_used, excess_blob_gas,
86                l1_gas_price, l1_gas_used, l1_fee_scalar
87            ) FROM STDIN WITH (FORMAT BINARY)";
88
89        let mut copy_in = self
90            .pool
91            .copy_in_raw(copy_statement)
92            .await
93            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95        // Write binary header
96        self.write_copy_header(&mut copy_in).await?;
97
98        // Write each block as binary data
99        for block in blocks {
100            self.write_block_binary(&mut copy_in, chain_id, block)
101                .await?;
102        }
103
104        // Write binary trailer
105        self.write_copy_trailer(&mut copy_in).await?;
106
107        // Finish the COPY operation
108        copy_in
109            .finish()
110            .await
111            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113        Ok(())
114    }
115
116    /// Inserts tokens using PostgreSQL COPY BINARY for maximum performance.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the COPY operation fails.
121    pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122        if tokens.is_empty() {
123            return Ok(());
124        }
125
126        let copy_statement = r"
127            COPY token (
128                chain_id, address, name, symbol, decimals
129            ) FROM STDIN WITH (FORMAT BINARY)";
130
131        let mut copy_in = self
132            .pool
133            .copy_in_raw(copy_statement)
134            .await
135            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137        self.write_copy_header(&mut copy_in).await?;
138        for token in tokens {
139            self.write_token_binary(&mut copy_in, chain_id, token)
140                .await?;
141        }
142        self.write_copy_trailer(&mut copy_in).await?;
143        copy_in
144            .finish()
145            .await
146            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148        Ok(())
149    }
150
151    /// Inserts pools using PostgreSQL COPY BINARY for maximum performance.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the COPY operation fails.
156    pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157        if pools.is_empty() {
158            return Ok(());
159        }
160
161        let copy_statement = r"
162            COPY pool (
163                chain_id, dex_name, address, pool_identifier, creation_block,
164                token0_chain, token0_address, token1_chain, token1_address,
165                fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
166            ) FROM STDIN WITH (FORMAT BINARY)";
167
168        let mut copy_in = self
169            .pool
170            .copy_in_raw(copy_statement)
171            .await
172            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174        self.write_copy_header(&mut copy_in).await?;
175        for pool in pools {
176            self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177        }
178        self.write_copy_trailer(&mut copy_in).await?;
179        copy_in
180            .finish()
181            .await
182            .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184        Ok(())
185    }
186
187    /// Inserts pool swaps using PostgreSQL COPY BINARY for maximum performance.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the COPY operation fails.
192    pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193        if swaps.is_empty() {
194            return Ok(());
195        }
196
197        let copy_statement = r"
198            COPY pool_swap_event (
199                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
200                log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
201                order_side, base_quantity, quote_quantity, spot_price, execution_price
202            ) FROM STDIN WITH (FORMAT BINARY)";
203
204        let mut copy_in = self
205            .pool
206            .copy_in_raw(copy_statement)
207            .await
208            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
209
210        // Write binary header
211        self.write_copy_header(&mut copy_in).await?;
212
213        // Write each swap as binary data
214        for swap in swaps {
215            self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
216                .await?;
217        }
218
219        // Write binary trailer
220        self.write_copy_trailer(&mut copy_in).await?;
221
222        // Finish the COPY operation
223        copy_in.finish().await.map_err(|e| {
224            // Log detailed information about the failed batch
225            tracing::error!("COPY operation failed for pool_swap batch:");
226            tracing::error!("  Chain ID: {}", chain_id);
227            tracing::error!("  Batch size: {}", swaps.len());
228
229            if !swaps.is_empty() {
230                tracing::error!(
231                    "  Block range: {} to {}",
232                    swaps.iter().map(|s| s.block).min().unwrap_or(0),
233                    swaps.iter().map(|s| s.block).max().unwrap_or(0)
234                );
235            }
236
237            // Log first few swaps with key details
238            for (i, swap) in swaps.iter().take(5).enumerate() {
239                tracing::error!(
240                    "  Swap[{}]: tx={} log_idx={} block={} pool={}",
241                    i,
242                    swap.transaction_hash,
243                    swap.log_index,
244                    swap.block,
245                    swap.instrument_id.to_string()
246                );
247            }
248
249            if swaps.len() > 5 {
250                tracing::error!("  ... and {} more swaps", swaps.len() - 5);
251            }
252
253            anyhow::anyhow!("Failed to finish COPY operation: {e}")
254        })?;
255
256        Ok(())
257    }
258
259    /// Inserts pool liquidity updates using PostgreSQL COPY BINARY for maximum performance.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the COPY operation fails.
264    pub async fn copy_pool_liquidity_updates(
265        &self,
266        chain_id: u32,
267        updates: &[PoolLiquidityUpdate],
268    ) -> anyhow::Result<()> {
269        if updates.is_empty() {
270            return Ok(());
271        }
272
273        let copy_statement = r"
274            COPY pool_liquidity_event (
275                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
276                log_index, event_type, sender, owner, position_liquidity,
277                amount0, amount1, tick_lower, tick_upper
278            ) FROM STDIN WITH (FORMAT BINARY)";
279
280        let mut copy_in = self
281            .pool
282            .copy_in_raw(copy_statement)
283            .await
284            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
285
286        // Write binary header
287        self.write_copy_header(&mut copy_in).await?;
288
289        // Write each liquidity update as binary data
290        for update in updates {
291            self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
292                .await?;
293        }
294
295        // Write binary trailer
296        self.write_copy_trailer(&mut copy_in).await?;
297
298        // Finish the COPY operation
299        copy_in.finish().await.map_err(|e| {
300            // Log detailed information about the failed batch
301            tracing::error!("COPY operation failed for pool_liquidity batch:");
302            tracing::error!("  Chain ID: {}", chain_id);
303            tracing::error!("  Batch size: {}", updates.len());
304
305            if !updates.is_empty() {
306                tracing::error!(
307                    "  Block range: {} to {}",
308                    updates.iter().map(|u| u.block).min().unwrap_or(0),
309                    updates.iter().map(|u| u.block).max().unwrap_or(0)
310                );
311            }
312
313            // Log first few liquidity updates with key details
314            for (i, update) in updates.iter().take(5).enumerate() {
315                tracing::error!(
316                    "  Update[{}]: tx={} log_idx={} block={} pool={} type={}",
317                    i,
318                    update.transaction_hash,
319                    update.log_index,
320                    update.block,
321                    update.pool_identifier,
322                    update.kind
323                );
324            }
325
326            if updates.len() > 5 {
327                tracing::error!("  ... and {} more updates", updates.len() - 5);
328            }
329
330            anyhow::anyhow!("Failed to finish COPY operation: {e}")
331        })?;
332
333        Ok(())
334    }
335
336    /// Writes the PostgreSQL COPY binary format header.
337    ///
338    /// The header consists of:
339    /// - 11-byte signature: "PGCOPY\n\xff\r\n\0"
340    /// - 4-byte flags field (all zeros)
341    /// - 4-byte header extension length (all zeros)
342    async fn write_copy_header(
343        &self,
344        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
345    ) -> anyhow::Result<()> {
346        use std::io::Write;
347        let mut header = Vec::new();
348
349        // PostgreSQL binary copy header
350        header.write_all(b"PGCOPY\n\xff\r\n\0")?; // Signature
351        header.write_all(&[0, 0, 0, 0])?; // Flags field
352        header.write_all(&[0, 0, 0, 0])?; // Header extension length
353
354        copy_in
355            .send(header)
356            .await
357            .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
358        Ok(())
359    }
360
361    /// Writes a single block in PostgreSQL binary format.
362    ///
363    /// Each row in binary format consists of:
364    /// - 2-byte field count
365    /// - For each field: 4-byte length followed by data (or -1 for NULL)
366    async fn write_block_binary(
367        &self,
368        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
369        chain_id: u32,
370        block: &Block,
371    ) -> anyhow::Result<()> {
372        use std::io::Write;
373        let mut row_data = Vec::new();
374
375        // Number of fields (14)
376        row_data.write_all(&14u16.to_be_bytes())?;
377
378        let chain_id_bytes = (chain_id as i32).to_be_bytes();
379        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
380        row_data.write_all(&chain_id_bytes)?;
381
382        let number_bytes = (block.number as i64).to_be_bytes();
383        row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
384        row_data.write_all(&number_bytes)?;
385
386        let hash_bytes = block.hash.as_bytes();
387        row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
388        row_data.write_all(hash_bytes)?;
389
390        let parent_hash_bytes = block.parent_hash.as_bytes();
391        row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
392        row_data.write_all(parent_hash_bytes)?;
393
394        let miner_bytes = block.miner.to_string().as_bytes().to_vec();
395        row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
396        row_data.write_all(&miner_bytes)?;
397
398        let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
399        row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
400        row_data.write_all(&gas_limit_bytes)?;
401
402        let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
403        row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
404        row_data.write_all(&gas_used_bytes)?;
405
406        let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
407        row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
408        row_data.write_all(&timestamp_bytes)?;
409
410        if let Some(ref base_fee) = block.base_fee_per_gas {
411            let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
412            row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
413            row_data.write_all(&base_fee_bytes)?;
414        } else {
415            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
416        }
417
418        if let Some(ref blob_gas) = block.blob_gas_used {
419            let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
420            row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
421            row_data.write_all(&blob_gas_bytes)?;
422        } else {
423            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
424        }
425
426        if let Some(ref excess_blob) = block.excess_blob_gas {
427            let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
428            row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
429            row_data.write_all(&excess_blob_bytes)?;
430        } else {
431            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
432        }
433
434        if let Some(ref l1_gas_price) = block.l1_gas_price {
435            let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
436            row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
437            row_data.write_all(&l1_gas_price_bytes)?;
438        } else {
439            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
440        }
441
442        if let Some(l1_gas_used) = block.l1_gas_used {
443            let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
444            row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
445            row_data.write_all(&l1_gas_used_bytes)?;
446        } else {
447            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
448        }
449
450        if let Some(l1_fee_scalar) = block.l1_fee_scalar {
451            let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
452            row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
453            row_data.write_all(&l1_fee_scalar_bytes)?;
454        } else {
455            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
456        }
457
458        copy_in
459            .send(row_data)
460            .await
461            .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
462        Ok(())
463    }
464
465    /// Writes a single pool swap in PostgreSQL binary format.
466    ///
467    /// Each row in binary format consists of:
468    /// - 2-byte field count
469    /// - For each field: 4-byte length followed by data (or -1 for NULL)
470    async fn write_pool_swap_binary(
471        &self,
472        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
473        chain_id: u32,
474        swap: &PoolSwap,
475    ) -> anyhow::Result<()> {
476        use std::io::Write;
477        let mut row_data = Vec::new();
478
479        row_data.write_all(&19u16.to_be_bytes())?;
480
481        let chain_id_bytes = (chain_id as i32).to_be_bytes();
482        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
483        row_data.write_all(&chain_id_bytes)?;
484
485        let dex_name_bytes = swap.dex.name.to_string().as_bytes().to_vec();
486        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
487        row_data.write_all(&dex_name_bytes)?;
488
489        let pool_identifier = swap.instrument_id.to_string();
490        let pool_identifier_bytes = pool_identifier.as_bytes();
491        row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
492        row_data.write_all(pool_identifier_bytes)?;
493
494        let block_bytes = (swap.block as i64).to_be_bytes();
495        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
496        row_data.write_all(&block_bytes)?;
497
498        let tx_hash_bytes = swap.transaction_hash.as_bytes();
499        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
500        row_data.write_all(tx_hash_bytes)?;
501
502        let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
503        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
504        row_data.write_all(&tx_index_bytes)?;
505
506        let log_index_bytes = (swap.log_index as i32).to_be_bytes();
507        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
508        row_data.write_all(&log_index_bytes)?;
509
510        let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
511        row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
512        row_data.write_all(&sender_bytes)?;
513
514        let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
515        row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
516        row_data.write_all(&recipient_bytes)?;
517
518        let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
519        row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
520        row_data.write_all(&sqrt_price_bytes)?;
521
522        let liquidity_bytes = format_numeric(&swap.liquidity).as_bytes().to_vec();
523        row_data.write_all(&(liquidity_bytes.len() as i32).to_be_bytes())?;
524        row_data.write_all(&liquidity_bytes)?;
525
526        let tick_bytes = swap.tick.to_be_bytes();
527        row_data.write_all(&(tick_bytes.len() as i32).to_be_bytes())?;
528        row_data.write_all(&tick_bytes)?;
529
530        let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
531        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
532        row_data.write_all(&amount0_bytes)?;
533
534        let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
535        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
536        row_data.write_all(&amount1_bytes)?;
537
538        if let Some(trade_info) = &swap.trade_info {
539            let side_bytes = trade_info.order_side.to_string().as_bytes().to_vec();
540            row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
541            row_data.write_all(&side_bytes)?;
542
543            let base_qty_decimal = trade_info.quantity_base.as_decimal();
544            let base_qty_str = base_qty_decimal.to_string();
545            let base_qty_bytes = base_qty_str.as_bytes();
546            row_data.write_all(&(base_qty_bytes.len() as i32).to_be_bytes())?;
547            row_data.write_all(base_qty_bytes)?;
548
549            let quote_qty_decimal = trade_info.quantity_quote.as_decimal();
550            let quote_qty_str = quote_qty_decimal.to_string();
551            let quote_qty_bytes = quote_qty_str.as_bytes();
552            row_data.write_all(&(quote_qty_bytes.len() as i32).to_be_bytes())?;
553            row_data.write_all(quote_qty_bytes)?;
554
555            let spot_price_decimal = trade_info.spot_price.as_decimal();
556            let spot_price_str = spot_price_decimal.to_string();
557            let spot_price_bytes = spot_price_str.as_bytes();
558            row_data.write_all(&(spot_price_bytes.len() as i32).to_be_bytes())?;
559            row_data.write_all(spot_price_bytes)?;
560
561            let exec_price_decimal = trade_info.execution_price.as_decimal();
562            let exec_price_str = exec_price_decimal.to_string();
563            let exec_price_bytes = exec_price_str.as_bytes();
564            row_data.write_all(&(exec_price_bytes.len() as i32).to_be_bytes())?;
565            row_data.write_all(exec_price_bytes)?;
566        } else {
567            row_data.write_all(&(-1i32).to_be_bytes())?;
568            row_data.write_all(&(-1i32).to_be_bytes())?;
569            row_data.write_all(&(-1i32).to_be_bytes())?;
570            row_data.write_all(&(-1i32).to_be_bytes())?;
571            row_data.write_all(&(-1i32).to_be_bytes())?;
572        }
573
574        copy_in
575            .send(row_data)
576            .await
577            .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
578        Ok(())
579    }
580
581    /// Writes a single pool liquidity update in PostgreSQL binary format.
582    ///
583    /// Each row in binary format consists of:
584    /// - 2-byte field count
585    /// - For each field: 4-byte length followed by data (or -1 for NULL)
586    async fn write_pool_liquidity_update_binary(
587        &self,
588        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
589        chain_id: u32,
590        update: &PoolLiquidityUpdate,
591    ) -> anyhow::Result<()> {
592        use std::io::Write;
593        let mut row_data = Vec::new();
594
595        row_data.write_all(&15u16.to_be_bytes())?;
596
597        let chain_id_bytes = (chain_id as i32).to_be_bytes();
598        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
599        row_data.write_all(&chain_id_bytes)?;
600
601        let dex_name_bytes = update.dex.name.to_string().as_bytes().to_vec();
602        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
603        row_data.write_all(&dex_name_bytes)?;
604
605        let pool_identifier = update.instrument_id.to_string();
606        let pool_identifier_bytes = pool_identifier.as_bytes();
607        row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
608        row_data.write_all(pool_identifier_bytes)?;
609
610        let block_bytes = (update.block as i64).to_be_bytes();
611        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
612        row_data.write_all(&block_bytes)?;
613
614        let tx_hash_bytes = update.transaction_hash.as_bytes();
615        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
616        row_data.write_all(tx_hash_bytes)?;
617
618        let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
619        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
620        row_data.write_all(&tx_index_bytes)?;
621
622        let log_index_bytes = (update.log_index as i32).to_be_bytes();
623        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
624        row_data.write_all(&log_index_bytes)?;
625
626        let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
627        row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
628        row_data.write_all(&event_type_bytes)?;
629
630        if let Some(sender) = update.sender {
631            let sender_bytes = sender.to_string().as_bytes().to_vec();
632            row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
633            row_data.write_all(&sender_bytes)?;
634        } else {
635            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL value
636        }
637
638        let owner_bytes = update.owner.to_string().as_bytes().to_vec();
639        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
640        row_data.write_all(&owner_bytes)?;
641
642        let position_liquidity_bytes = format_numeric(&update.position_liquidity)
643            .as_bytes()
644            .to_vec();
645        row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
646        row_data.write_all(&position_liquidity_bytes)?;
647
648        let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
649        row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
650        row_data.write_all(&amount0_bytes)?;
651
652        let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
653        row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
654        row_data.write_all(&amount1_bytes)?;
655
656        let tick_lower_bytes = update.tick_lower.to_be_bytes();
657        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
658        row_data.write_all(&tick_lower_bytes)?;
659
660        let tick_upper_bytes = update.tick_upper.to_be_bytes();
661        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
662        row_data.write_all(&tick_upper_bytes)?;
663
664        copy_in
665            .send(row_data)
666            .await
667            .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
668        Ok(())
669    }
670
671    /// Inserts pool fee collect events using PostgreSQL COPY BINARY for maximum performance.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the COPY operation fails.
676    pub async fn copy_pool_collects(
677        &self,
678        chain_id: u32,
679        collects: &[PoolFeeCollect],
680    ) -> anyhow::Result<()> {
681        if collects.is_empty() {
682            return Ok(());
683        }
684
685        let copy_statement = r"
686            COPY pool_collect_event (
687                chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
688                log_index, owner, amount0, amount1, tick_lower, tick_upper
689            ) FROM STDIN WITH (FORMAT BINARY)";
690
691        let mut copy_in = self
692            .pool
693            .copy_in_raw(copy_statement)
694            .await
695            .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
696
697        // Write binary header
698        self.write_copy_header(&mut copy_in).await?;
699
700        // Write each collect event as binary data
701        for collect in collects {
702            self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
703                .await?;
704        }
705
706        // Write binary trailer
707        self.write_copy_trailer(&mut copy_in).await?;
708
709        // Finish the COPY operation
710        copy_in.finish().await.map_err(|e| {
711            // Log detailed information about the failed batch
712            tracing::error!("COPY operation failed for temp_pool_collect batch:");
713            tracing::error!("  Chain ID: {}", chain_id);
714            tracing::error!("  Batch size: {}", collects.len());
715
716            if !collects.is_empty() {
717                tracing::error!(
718                    "  Block range: {} to {}",
719                    collects.iter().map(|c| c.block).min().unwrap_or(0),
720                    collects.iter().map(|c| c.block).max().unwrap_or(0)
721                );
722            }
723
724            // Log first few collects with key details
725            for (i, collect) in collects.iter().take(5).enumerate() {
726                tracing::error!(
727                    "  Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
728                    i,
729                    collect.transaction_hash,
730                    collect.log_index,
731                    collect.block,
732                    collect.pool_identifier,
733                    collect.owner
734                );
735            }
736
737            if collects.len() > 5 {
738                tracing::error!("  ... and {} more collects", collects.len() - 5);
739            }
740
741            anyhow::anyhow!("Failed to finish COPY operation: {e}")
742        })?;
743
744        Ok(())
745    }
746
747    /// Writes a single pool fee collect in PostgreSQL binary format.
748    ///
749    /// Each row in binary format consists of:
750    /// - 2-byte field count
751    /// - For each field: 4-byte length followed by data (or -1 for NULL)
752    async fn write_pool_fee_collect_binary(
753        &self,
754        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
755        chain_id: u32,
756        collect: &PoolFeeCollect,
757    ) -> anyhow::Result<()> {
758        use std::io::Write;
759        let mut row_data = Vec::new();
760
761        row_data.write_all(&12u16.to_be_bytes())?;
762
763        let chain_id_bytes = (chain_id as i32).to_be_bytes();
764        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
765        row_data.write_all(&chain_id_bytes)?;
766
767        let dex_name_bytes = collect.dex.name.to_string().as_bytes().to_vec();
768        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
769        row_data.write_all(&dex_name_bytes)?;
770
771        let pool_identifier = collect.instrument_id.to_string();
772        let pool_identifier_bytes = pool_identifier.as_bytes();
773        row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
774        row_data.write_all(pool_identifier_bytes)?;
775
776        let block_bytes = (collect.block as i64).to_be_bytes();
777        row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
778        row_data.write_all(&block_bytes)?;
779
780        let tx_hash_bytes = collect.transaction_hash.as_bytes();
781        row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
782        row_data.write_all(tx_hash_bytes)?;
783
784        let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
785        row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
786        row_data.write_all(&tx_index_bytes)?;
787
788        let log_index_bytes = (collect.log_index as i32).to_be_bytes();
789        row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
790        row_data.write_all(&log_index_bytes)?;
791
792        let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
793        row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
794        row_data.write_all(&owner_bytes)?;
795
796        let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
797        row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
798        row_data.write_all(&fee0_bytes)?;
799
800        let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
801        row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
802        row_data.write_all(&fee1_bytes)?;
803
804        let tick_lower_bytes = collect.tick_lower.to_be_bytes();
805        row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
806        row_data.write_all(&tick_lower_bytes)?;
807
808        let tick_upper_bytes = collect.tick_upper.to_be_bytes();
809        row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
810        row_data.write_all(&tick_upper_bytes)?;
811
812        copy_in
813            .send(row_data)
814            .await
815            .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
816        Ok(())
817    }
818
819    /// Writes a single token in PostgreSQL binary format.
820    ///
821    /// Each row in binary format consists of:
822    /// - 2-byte field count
823    /// - For each field: 4-byte length followed by data (or -1 for NULL)
824    async fn write_token_binary(
825        &self,
826        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
827        chain_id: u32,
828        token: &Token,
829    ) -> anyhow::Result<()> {
830        use std::io::Write;
831        let mut row_data = Vec::new();
832
833        row_data.write_all(&5u16.to_be_bytes())?;
834
835        let chain_id_bytes = (chain_id as i32).to_be_bytes();
836        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
837        row_data.write_all(&chain_id_bytes)?;
838
839        let address_bytes = token.address.to_string().as_bytes().to_vec();
840        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
841        row_data.write_all(&address_bytes)?;
842
843        let name_bytes = token.name.as_bytes();
844        row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
845        row_data.write_all(name_bytes)?;
846
847        let symbol_bytes = token.symbol.as_bytes();
848        row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
849        row_data.write_all(symbol_bytes)?;
850
851        let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
852        row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
853        row_data.write_all(&decimals_bytes)?;
854
855        copy_in
856            .send(row_data)
857            .await
858            .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
859        Ok(())
860    }
861
862    /// Writes a single pool in PostgreSQL binary format.
863    ///
864    /// Each row in binary format consists of:
865    /// - 2-byte field count
866    /// - For each field: 4-byte length followed by data (or -1 for NULL)
867    async fn write_pool_binary(
868        &self,
869        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
870        chain_id: u32,
871        pool: &Pool,
872    ) -> anyhow::Result<()> {
873        use std::io::Write;
874        let mut row_data = Vec::new();
875
876        row_data.write_all(&14u16.to_be_bytes())?;
877
878        let chain_id_bytes = (chain_id as i32).to_be_bytes();
879        row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
880        row_data.write_all(&chain_id_bytes)?;
881
882        let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
883        row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
884        row_data.write_all(&dex_name_bytes)?;
885
886        let address_bytes = pool.address.to_string().as_bytes().to_vec();
887        row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
888        row_data.write_all(&address_bytes)?;
889
890        let pool_identifier_bytes = pool.pool_identifier.as_str().as_bytes();
891        row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
892        row_data.write_all(pool_identifier_bytes)?;
893
894        let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
895        row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
896        row_data.write_all(&creation_block_bytes)?;
897
898        let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
899        row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
900        row_data.write_all(&token0_chain_bytes)?;
901
902        let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
903        row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
904        row_data.write_all(&token0_address_bytes)?;
905
906        let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
907        row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
908        row_data.write_all(&token1_chain_bytes)?;
909
910        let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
911        row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
912        row_data.write_all(&token1_address_bytes)?;
913
914        if let Some(fee) = pool.fee {
915            let fee_bytes = (fee as i32).to_be_bytes();
916            row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
917            row_data.write_all(&fee_bytes)?;
918        } else {
919            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
920        }
921
922        if let Some(tick_spacing) = pool.tick_spacing {
923            let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
924            row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
925            row_data.write_all(&tick_spacing_bytes)?;
926        } else {
927            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
928        }
929
930        if let Some(initial_tick) = pool.initial_tick {
931            let initial_tick_bytes = initial_tick.to_be_bytes();
932            row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
933            row_data.write_all(&initial_tick_bytes)?;
934        } else {
935            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
936        }
937
938        if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
939            let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
940            row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
941            row_data.write_all(&sqrt_price_bytes)?;
942        } else {
943            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
944        }
945
946        if let Some(ref hooks) = pool.hooks {
947            let hooks_bytes = hooks.to_string().as_bytes().to_vec();
948            row_data.write_all(&(hooks_bytes.len() as i32).to_be_bytes())?;
949            row_data.write_all(&hooks_bytes)?;
950        } else {
951            row_data.write_all(&(-1i32).to_be_bytes())?; // NULL
952        }
953
954        copy_in
955            .send(row_data)
956            .await
957            .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
958        Ok(())
959    }
960
961    /// Writes the PostgreSQL COPY binary format trailer.
962    ///
963    /// The trailer is a 2-byte value of -1 to indicate end of data.
964    async fn write_copy_trailer(
965        &self,
966        copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
967    ) -> anyhow::Result<()> {
968        // Binary trailer: -1 as i16 to indicate end of data
969        let trailer = (-1i16).to_be_bytes();
970        copy_in
971            .send(trailer.to_vec())
972            .await
973            .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
974        Ok(())
975    }
976}