1use nautilus_model::defi::{
22 Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26fn format_scientific_to_decimal(s: &str) -> String {
28 use std::str::FromStr;
29
30 use rust_decimal::Decimal;
31
32 match Decimal::from_str(s) {
33 Ok(decimal) => decimal.to_string(),
34 Err(_) => s.to_string(), }
36}
37
38fn format_numeric<T: ToString>(value: &T) -> String {
40 let s = value.to_string();
41
42 let s = s.trim_start_matches('+');
44
45 if s.contains('e') || s.contains('E') {
47 return format_scientific_to_decimal(s);
48 }
49
50 s.to_string()
54}
55
56#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59 pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63 #[must_use]
65 pub const fn new(pool: &'a PgPool) -> Self {
66 Self { pool }
67 }
68
69 pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78 if blocks.is_empty() {
79 return Ok(());
80 }
81
82 let copy_statement = r"
83 COPY block (
84 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85 base_fee_per_gas, blob_gas_used, excess_blob_gas,
86 l1_gas_price, l1_gas_used, l1_fee_scalar
87 ) FROM STDIN WITH (FORMAT BINARY)";
88
89 let mut copy_in = self
90 .pool
91 .copy_in_raw(copy_statement)
92 .await
93 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95 self.write_copy_header(&mut copy_in).await?;
97
98 for block in blocks {
100 self.write_block_binary(&mut copy_in, chain_id, block)
101 .await?;
102 }
103
104 self.write_copy_trailer(&mut copy_in).await?;
106
107 copy_in
109 .finish()
110 .await
111 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113 Ok(())
114 }
115
116 pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122 if tokens.is_empty() {
123 return Ok(());
124 }
125
126 let copy_statement = r"
127 COPY token (
128 chain_id, address, name, symbol, decimals
129 ) FROM STDIN WITH (FORMAT BINARY)";
130
131 let mut copy_in = self
132 .pool
133 .copy_in_raw(copy_statement)
134 .await
135 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137 self.write_copy_header(&mut copy_in).await?;
138 for token in tokens {
139 self.write_token_binary(&mut copy_in, chain_id, token)
140 .await?;
141 }
142 self.write_copy_trailer(&mut copy_in).await?;
143 copy_in
144 .finish()
145 .await
146 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148 Ok(())
149 }
150
151 pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157 if pools.is_empty() {
158 return Ok(());
159 }
160
161 let copy_statement = r"
162 COPY pool (
163 chain_id, dex_name, address, pool_identifier, creation_block,
164 token0_chain, token0_address, token1_chain, token1_address,
165 fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
166 ) FROM STDIN WITH (FORMAT BINARY)";
167
168 let mut copy_in = self
169 .pool
170 .copy_in_raw(copy_statement)
171 .await
172 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174 self.write_copy_header(&mut copy_in).await?;
175 for pool in pools {
176 self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177 }
178 self.write_copy_trailer(&mut copy_in).await?;
179 copy_in
180 .finish()
181 .await
182 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184 Ok(())
185 }
186
187 pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193 if swaps.is_empty() {
194 return Ok(());
195 }
196
197 let copy_statement = r"
198 COPY pool_swap_event (
199 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
200 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
201 order_side, base_quantity, quote_quantity, spot_price, execution_price
202 ) FROM STDIN WITH (FORMAT BINARY)";
203
204 let mut copy_in = self
205 .pool
206 .copy_in_raw(copy_statement)
207 .await
208 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
209
210 self.write_copy_header(&mut copy_in).await?;
212
213 for swap in swaps {
215 self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
216 .await?;
217 }
218
219 self.write_copy_trailer(&mut copy_in).await?;
221
222 copy_in.finish().await.map_err(|e| {
224 tracing::error!("COPY operation failed for pool_swap batch:");
226 tracing::error!(" Chain ID: {}", chain_id);
227 tracing::error!(" Batch size: {}", swaps.len());
228
229 if !swaps.is_empty() {
230 tracing::error!(
231 " Block range: {} to {}",
232 swaps.iter().map(|s| s.block).min().unwrap_or(0),
233 swaps.iter().map(|s| s.block).max().unwrap_or(0)
234 );
235 }
236
237 for (i, swap) in swaps.iter().take(5).enumerate() {
239 tracing::error!(
240 " Swap[{}]: tx={} log_idx={} block={} pool={}",
241 i,
242 swap.transaction_hash,
243 swap.log_index,
244 swap.block,
245 swap.instrument_id.to_string()
246 );
247 }
248
249 if swaps.len() > 5 {
250 tracing::error!(" ... and {} more swaps", swaps.len() - 5);
251 }
252
253 anyhow::anyhow!("Failed to finish COPY operation: {e}")
254 })?;
255
256 Ok(())
257 }
258
259 pub async fn copy_pool_liquidity_updates(
265 &self,
266 chain_id: u32,
267 updates: &[PoolLiquidityUpdate],
268 ) -> anyhow::Result<()> {
269 if updates.is_empty() {
270 return Ok(());
271 }
272
273 let copy_statement = r"
274 COPY pool_liquidity_event (
275 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
276 log_index, event_type, sender, owner, position_liquidity,
277 amount0, amount1, tick_lower, tick_upper
278 ) FROM STDIN WITH (FORMAT BINARY)";
279
280 let mut copy_in = self
281 .pool
282 .copy_in_raw(copy_statement)
283 .await
284 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
285
286 self.write_copy_header(&mut copy_in).await?;
288
289 for update in updates {
291 self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
292 .await?;
293 }
294
295 self.write_copy_trailer(&mut copy_in).await?;
297
298 copy_in.finish().await.map_err(|e| {
300 tracing::error!("COPY operation failed for pool_liquidity batch:");
302 tracing::error!(" Chain ID: {}", chain_id);
303 tracing::error!(" Batch size: {}", updates.len());
304
305 if !updates.is_empty() {
306 tracing::error!(
307 " Block range: {} to {}",
308 updates.iter().map(|u| u.block).min().unwrap_or(0),
309 updates.iter().map(|u| u.block).max().unwrap_or(0)
310 );
311 }
312
313 for (i, update) in updates.iter().take(5).enumerate() {
315 tracing::error!(
316 " Update[{}]: tx={} log_idx={} block={} pool={} type={}",
317 i,
318 update.transaction_hash,
319 update.log_index,
320 update.block,
321 update.pool_identifier,
322 update.kind
323 );
324 }
325
326 if updates.len() > 5 {
327 tracing::error!(" ... and {} more updates", updates.len() - 5);
328 }
329
330 anyhow::anyhow!("Failed to finish COPY operation: {e}")
331 })?;
332
333 Ok(())
334 }
335
336 async fn write_copy_header(
343 &self,
344 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
345 ) -> anyhow::Result<()> {
346 use std::io::Write;
347 let mut header = Vec::new();
348
349 header.write_all(b"PGCOPY\n\xff\r\n\0")?; header.write_all(&[0, 0, 0, 0])?; header.write_all(&[0, 0, 0, 0])?; copy_in
355 .send(header)
356 .await
357 .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
358 Ok(())
359 }
360
361 async fn write_block_binary(
367 &self,
368 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
369 chain_id: u32,
370 block: &Block,
371 ) -> anyhow::Result<()> {
372 use std::io::Write;
373 let mut row_data = Vec::new();
374
375 row_data.write_all(&14u16.to_be_bytes())?;
377
378 let chain_id_bytes = (chain_id as i32).to_be_bytes();
379 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
380 row_data.write_all(&chain_id_bytes)?;
381
382 let number_bytes = (block.number as i64).to_be_bytes();
383 row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
384 row_data.write_all(&number_bytes)?;
385
386 let hash_bytes = block.hash.as_bytes();
387 row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
388 row_data.write_all(hash_bytes)?;
389
390 let parent_hash_bytes = block.parent_hash.as_bytes();
391 row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
392 row_data.write_all(parent_hash_bytes)?;
393
394 let miner_bytes = block.miner.to_string().as_bytes().to_vec();
395 row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
396 row_data.write_all(&miner_bytes)?;
397
398 let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
399 row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
400 row_data.write_all(&gas_limit_bytes)?;
401
402 let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
403 row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
404 row_data.write_all(&gas_used_bytes)?;
405
406 let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
407 row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
408 row_data.write_all(×tamp_bytes)?;
409
410 if let Some(ref base_fee) = block.base_fee_per_gas {
411 let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
412 row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
413 row_data.write_all(&base_fee_bytes)?;
414 } else {
415 row_data.write_all(&(-1i32).to_be_bytes())?; }
417
418 if let Some(ref blob_gas) = block.blob_gas_used {
419 let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
420 row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
421 row_data.write_all(&blob_gas_bytes)?;
422 } else {
423 row_data.write_all(&(-1i32).to_be_bytes())?; }
425
426 if let Some(ref excess_blob) = block.excess_blob_gas {
427 let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
428 row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
429 row_data.write_all(&excess_blob_bytes)?;
430 } else {
431 row_data.write_all(&(-1i32).to_be_bytes())?; }
433
434 if let Some(ref l1_gas_price) = block.l1_gas_price {
435 let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
436 row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
437 row_data.write_all(&l1_gas_price_bytes)?;
438 } else {
439 row_data.write_all(&(-1i32).to_be_bytes())?; }
441
442 if let Some(l1_gas_used) = block.l1_gas_used {
443 let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
444 row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
445 row_data.write_all(&l1_gas_used_bytes)?;
446 } else {
447 row_data.write_all(&(-1i32).to_be_bytes())?; }
449
450 if let Some(l1_fee_scalar) = block.l1_fee_scalar {
451 let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
452 row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
453 row_data.write_all(&l1_fee_scalar_bytes)?;
454 } else {
455 row_data.write_all(&(-1i32).to_be_bytes())?; }
457
458 copy_in
459 .send(row_data)
460 .await
461 .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
462 Ok(())
463 }
464
465 async fn write_pool_swap_binary(
471 &self,
472 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
473 chain_id: u32,
474 swap: &PoolSwap,
475 ) -> anyhow::Result<()> {
476 use std::io::Write;
477 let mut row_data = Vec::new();
478
479 row_data.write_all(&19u16.to_be_bytes())?;
480
481 let chain_id_bytes = (chain_id as i32).to_be_bytes();
482 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
483 row_data.write_all(&chain_id_bytes)?;
484
485 let dex_name_bytes = swap.dex.name.to_string().as_bytes().to_vec();
486 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
487 row_data.write_all(&dex_name_bytes)?;
488
489 let pool_identifier = swap.instrument_id.to_string();
490 let pool_identifier_bytes = pool_identifier.as_bytes();
491 row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
492 row_data.write_all(pool_identifier_bytes)?;
493
494 let block_bytes = (swap.block as i64).to_be_bytes();
495 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
496 row_data.write_all(&block_bytes)?;
497
498 let tx_hash_bytes = swap.transaction_hash.as_bytes();
499 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
500 row_data.write_all(tx_hash_bytes)?;
501
502 let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
503 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
504 row_data.write_all(&tx_index_bytes)?;
505
506 let log_index_bytes = (swap.log_index as i32).to_be_bytes();
507 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
508 row_data.write_all(&log_index_bytes)?;
509
510 let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
511 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
512 row_data.write_all(&sender_bytes)?;
513
514 let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
515 row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
516 row_data.write_all(&recipient_bytes)?;
517
518 let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
519 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
520 row_data.write_all(&sqrt_price_bytes)?;
521
522 let liquidity_bytes = format_numeric(&swap.liquidity).as_bytes().to_vec();
523 row_data.write_all(&(liquidity_bytes.len() as i32).to_be_bytes())?;
524 row_data.write_all(&liquidity_bytes)?;
525
526 let tick_bytes = swap.tick.to_be_bytes();
527 row_data.write_all(&(tick_bytes.len() as i32).to_be_bytes())?;
528 row_data.write_all(&tick_bytes)?;
529
530 let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
531 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
532 row_data.write_all(&amount0_bytes)?;
533
534 let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
535 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
536 row_data.write_all(&amount1_bytes)?;
537
538 if let Some(trade_info) = &swap.trade_info {
539 let side_bytes = trade_info.order_side.to_string().as_bytes().to_vec();
540 row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
541 row_data.write_all(&side_bytes)?;
542
543 let base_qty_decimal = trade_info.quantity_base.as_decimal();
544 let base_qty_str = base_qty_decimal.to_string();
545 let base_qty_bytes = base_qty_str.as_bytes();
546 row_data.write_all(&(base_qty_bytes.len() as i32).to_be_bytes())?;
547 row_data.write_all(base_qty_bytes)?;
548
549 let quote_qty_decimal = trade_info.quantity_quote.as_decimal();
550 let quote_qty_str = quote_qty_decimal.to_string();
551 let quote_qty_bytes = quote_qty_str.as_bytes();
552 row_data.write_all(&(quote_qty_bytes.len() as i32).to_be_bytes())?;
553 row_data.write_all(quote_qty_bytes)?;
554
555 let spot_price_decimal = trade_info.spot_price.as_decimal();
556 let spot_price_str = spot_price_decimal.to_string();
557 let spot_price_bytes = spot_price_str.as_bytes();
558 row_data.write_all(&(spot_price_bytes.len() as i32).to_be_bytes())?;
559 row_data.write_all(spot_price_bytes)?;
560
561 let exec_price_decimal = trade_info.execution_price.as_decimal();
562 let exec_price_str = exec_price_decimal.to_string();
563 let exec_price_bytes = exec_price_str.as_bytes();
564 row_data.write_all(&(exec_price_bytes.len() as i32).to_be_bytes())?;
565 row_data.write_all(exec_price_bytes)?;
566 } else {
567 row_data.write_all(&(-1i32).to_be_bytes())?;
568 row_data.write_all(&(-1i32).to_be_bytes())?;
569 row_data.write_all(&(-1i32).to_be_bytes())?;
570 row_data.write_all(&(-1i32).to_be_bytes())?;
571 row_data.write_all(&(-1i32).to_be_bytes())?;
572 }
573
574 copy_in
575 .send(row_data)
576 .await
577 .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
578 Ok(())
579 }
580
581 async fn write_pool_liquidity_update_binary(
587 &self,
588 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
589 chain_id: u32,
590 update: &PoolLiquidityUpdate,
591 ) -> anyhow::Result<()> {
592 use std::io::Write;
593 let mut row_data = Vec::new();
594
595 row_data.write_all(&15u16.to_be_bytes())?;
596
597 let chain_id_bytes = (chain_id as i32).to_be_bytes();
598 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
599 row_data.write_all(&chain_id_bytes)?;
600
601 let dex_name_bytes = update.dex.name.to_string().as_bytes().to_vec();
602 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
603 row_data.write_all(&dex_name_bytes)?;
604
605 let pool_identifier = update.instrument_id.to_string();
606 let pool_identifier_bytes = pool_identifier.as_bytes();
607 row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
608 row_data.write_all(pool_identifier_bytes)?;
609
610 let block_bytes = (update.block as i64).to_be_bytes();
611 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
612 row_data.write_all(&block_bytes)?;
613
614 let tx_hash_bytes = update.transaction_hash.as_bytes();
615 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
616 row_data.write_all(tx_hash_bytes)?;
617
618 let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
619 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
620 row_data.write_all(&tx_index_bytes)?;
621
622 let log_index_bytes = (update.log_index as i32).to_be_bytes();
623 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
624 row_data.write_all(&log_index_bytes)?;
625
626 let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
627 row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
628 row_data.write_all(&event_type_bytes)?;
629
630 if let Some(sender) = update.sender {
631 let sender_bytes = sender.to_string().as_bytes().to_vec();
632 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
633 row_data.write_all(&sender_bytes)?;
634 } else {
635 row_data.write_all(&(-1i32).to_be_bytes())?; }
637
638 let owner_bytes = update.owner.to_string().as_bytes().to_vec();
639 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
640 row_data.write_all(&owner_bytes)?;
641
642 let position_liquidity_bytes = format_numeric(&update.position_liquidity)
643 .as_bytes()
644 .to_vec();
645 row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
646 row_data.write_all(&position_liquidity_bytes)?;
647
648 let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
649 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
650 row_data.write_all(&amount0_bytes)?;
651
652 let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
653 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
654 row_data.write_all(&amount1_bytes)?;
655
656 let tick_lower_bytes = update.tick_lower.to_be_bytes();
657 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
658 row_data.write_all(&tick_lower_bytes)?;
659
660 let tick_upper_bytes = update.tick_upper.to_be_bytes();
661 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
662 row_data.write_all(&tick_upper_bytes)?;
663
664 copy_in
665 .send(row_data)
666 .await
667 .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
668 Ok(())
669 }
670
671 pub async fn copy_pool_collects(
677 &self,
678 chain_id: u32,
679 collects: &[PoolFeeCollect],
680 ) -> anyhow::Result<()> {
681 if collects.is_empty() {
682 return Ok(());
683 }
684
685 let copy_statement = r"
686 COPY pool_collect_event (
687 chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
688 log_index, owner, amount0, amount1, tick_lower, tick_upper
689 ) FROM STDIN WITH (FORMAT BINARY)";
690
691 let mut copy_in = self
692 .pool
693 .copy_in_raw(copy_statement)
694 .await
695 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
696
697 self.write_copy_header(&mut copy_in).await?;
699
700 for collect in collects {
702 self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
703 .await?;
704 }
705
706 self.write_copy_trailer(&mut copy_in).await?;
708
709 copy_in.finish().await.map_err(|e| {
711 tracing::error!("COPY operation failed for temp_pool_collect batch:");
713 tracing::error!(" Chain ID: {}", chain_id);
714 tracing::error!(" Batch size: {}", collects.len());
715
716 if !collects.is_empty() {
717 tracing::error!(
718 " Block range: {} to {}",
719 collects.iter().map(|c| c.block).min().unwrap_or(0),
720 collects.iter().map(|c| c.block).max().unwrap_or(0)
721 );
722 }
723
724 for (i, collect) in collects.iter().take(5).enumerate() {
726 tracing::error!(
727 " Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
728 i,
729 collect.transaction_hash,
730 collect.log_index,
731 collect.block,
732 collect.pool_identifier,
733 collect.owner
734 );
735 }
736
737 if collects.len() > 5 {
738 tracing::error!(" ... and {} more collects", collects.len() - 5);
739 }
740
741 anyhow::anyhow!("Failed to finish COPY operation: {e}")
742 })?;
743
744 Ok(())
745 }
746
747 async fn write_pool_fee_collect_binary(
753 &self,
754 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
755 chain_id: u32,
756 collect: &PoolFeeCollect,
757 ) -> anyhow::Result<()> {
758 use std::io::Write;
759 let mut row_data = Vec::new();
760
761 row_data.write_all(&12u16.to_be_bytes())?;
762
763 let chain_id_bytes = (chain_id as i32).to_be_bytes();
764 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
765 row_data.write_all(&chain_id_bytes)?;
766
767 let dex_name_bytes = collect.dex.name.to_string().as_bytes().to_vec();
768 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
769 row_data.write_all(&dex_name_bytes)?;
770
771 let pool_identifier = collect.instrument_id.to_string();
772 let pool_identifier_bytes = pool_identifier.as_bytes();
773 row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
774 row_data.write_all(pool_identifier_bytes)?;
775
776 let block_bytes = (collect.block as i64).to_be_bytes();
777 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
778 row_data.write_all(&block_bytes)?;
779
780 let tx_hash_bytes = collect.transaction_hash.as_bytes();
781 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
782 row_data.write_all(tx_hash_bytes)?;
783
784 let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
785 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
786 row_data.write_all(&tx_index_bytes)?;
787
788 let log_index_bytes = (collect.log_index as i32).to_be_bytes();
789 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
790 row_data.write_all(&log_index_bytes)?;
791
792 let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
793 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
794 row_data.write_all(&owner_bytes)?;
795
796 let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
797 row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
798 row_data.write_all(&fee0_bytes)?;
799
800 let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
801 row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
802 row_data.write_all(&fee1_bytes)?;
803
804 let tick_lower_bytes = collect.tick_lower.to_be_bytes();
805 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
806 row_data.write_all(&tick_lower_bytes)?;
807
808 let tick_upper_bytes = collect.tick_upper.to_be_bytes();
809 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
810 row_data.write_all(&tick_upper_bytes)?;
811
812 copy_in
813 .send(row_data)
814 .await
815 .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
816 Ok(())
817 }
818
819 async fn write_token_binary(
825 &self,
826 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
827 chain_id: u32,
828 token: &Token,
829 ) -> anyhow::Result<()> {
830 use std::io::Write;
831 let mut row_data = Vec::new();
832
833 row_data.write_all(&5u16.to_be_bytes())?;
834
835 let chain_id_bytes = (chain_id as i32).to_be_bytes();
836 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
837 row_data.write_all(&chain_id_bytes)?;
838
839 let address_bytes = token.address.to_string().as_bytes().to_vec();
840 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
841 row_data.write_all(&address_bytes)?;
842
843 let name_bytes = token.name.as_bytes();
844 row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
845 row_data.write_all(name_bytes)?;
846
847 let symbol_bytes = token.symbol.as_bytes();
848 row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
849 row_data.write_all(symbol_bytes)?;
850
851 let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
852 row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
853 row_data.write_all(&decimals_bytes)?;
854
855 copy_in
856 .send(row_data)
857 .await
858 .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
859 Ok(())
860 }
861
862 async fn write_pool_binary(
868 &self,
869 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
870 chain_id: u32,
871 pool: &Pool,
872 ) -> anyhow::Result<()> {
873 use std::io::Write;
874 let mut row_data = Vec::new();
875
876 row_data.write_all(&14u16.to_be_bytes())?;
877
878 let chain_id_bytes = (chain_id as i32).to_be_bytes();
879 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
880 row_data.write_all(&chain_id_bytes)?;
881
882 let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
883 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
884 row_data.write_all(&dex_name_bytes)?;
885
886 let address_bytes = pool.address.to_string().as_bytes().to_vec();
887 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
888 row_data.write_all(&address_bytes)?;
889
890 let pool_identifier_bytes = pool.pool_identifier.as_str().as_bytes();
891 row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
892 row_data.write_all(pool_identifier_bytes)?;
893
894 let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
895 row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
896 row_data.write_all(&creation_block_bytes)?;
897
898 let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
899 row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
900 row_data.write_all(&token0_chain_bytes)?;
901
902 let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
903 row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
904 row_data.write_all(&token0_address_bytes)?;
905
906 let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
907 row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
908 row_data.write_all(&token1_chain_bytes)?;
909
910 let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
911 row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
912 row_data.write_all(&token1_address_bytes)?;
913
914 if let Some(fee) = pool.fee {
915 let fee_bytes = (fee as i32).to_be_bytes();
916 row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
917 row_data.write_all(&fee_bytes)?;
918 } else {
919 row_data.write_all(&(-1i32).to_be_bytes())?; }
921
922 if let Some(tick_spacing) = pool.tick_spacing {
923 let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
924 row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
925 row_data.write_all(&tick_spacing_bytes)?;
926 } else {
927 row_data.write_all(&(-1i32).to_be_bytes())?; }
929
930 if let Some(initial_tick) = pool.initial_tick {
931 let initial_tick_bytes = initial_tick.to_be_bytes();
932 row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
933 row_data.write_all(&initial_tick_bytes)?;
934 } else {
935 row_data.write_all(&(-1i32).to_be_bytes())?; }
937
938 if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
939 let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
940 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
941 row_data.write_all(&sqrt_price_bytes)?;
942 } else {
943 row_data.write_all(&(-1i32).to_be_bytes())?; }
945
946 if let Some(ref hooks) = pool.hooks {
947 let hooks_bytes = hooks.to_string().as_bytes().to_vec();
948 row_data.write_all(&(hooks_bytes.len() as i32).to_be_bytes())?;
949 row_data.write_all(&hooks_bytes)?;
950 } else {
951 row_data.write_all(&(-1i32).to_be_bytes())?; }
953
954 copy_in
955 .send(row_data)
956 .await
957 .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
958 Ok(())
959 }
960
961 async fn write_copy_trailer(
965 &self,
966 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
967 ) -> anyhow::Result<()> {
968 let trailer = (-1i16).to_be_bytes();
970 copy_in
971 .send(trailer.to_vec())
972 .await
973 .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
974 Ok(())
975 }
976}