1use nautilus_model::defi::{
22 Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26fn format_scientific_to_decimal(s: &str) -> String {
28 use std::str::FromStr;
29
30 use rust_decimal::Decimal;
31
32 match Decimal::from_str(s) {
33 Ok(decimal) => decimal.to_string(),
34 Err(_) => s.to_string(), }
36}
37
38fn format_numeric<T: ToString>(value: &T) -> String {
40 let s = value.to_string();
41
42 let s = s.trim_start_matches('+');
44
45 if s.contains('e') || s.contains('E') {
47 return format_scientific_to_decimal(s);
48 }
49
50 s.to_string()
54}
55
56#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59 pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63 #[must_use]
65 pub const fn new(pool: &'a PgPool) -> Self {
66 Self { pool }
67 }
68
69 pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78 if blocks.is_empty() {
79 return Ok(());
80 }
81
82 let copy_statement = r"
83 COPY block (
84 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85 base_fee_per_gas, blob_gas_used, excess_blob_gas,
86 l1_gas_price, l1_gas_used, l1_fee_scalar
87 ) FROM STDIN WITH (FORMAT BINARY)";
88
89 let mut copy_in = self
90 .pool
91 .copy_in_raw(copy_statement)
92 .await
93 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95 self.write_copy_header(&mut copy_in).await?;
97
98 for block in blocks {
100 self.write_block_binary(&mut copy_in, chain_id, block)
101 .await?;
102 }
103
104 self.write_copy_trailer(&mut copy_in).await?;
106
107 copy_in
109 .finish()
110 .await
111 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113 Ok(())
114 }
115
116 pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122 if tokens.is_empty() {
123 return Ok(());
124 }
125
126 let copy_statement = r"
127 COPY token (
128 chain_id, address, name, symbol, decimals
129 ) FROM STDIN WITH (FORMAT BINARY)";
130
131 let mut copy_in = self
132 .pool
133 .copy_in_raw(copy_statement)
134 .await
135 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137 self.write_copy_header(&mut copy_in).await?;
138 for token in tokens {
139 self.write_token_binary(&mut copy_in, chain_id, token)
140 .await?;
141 }
142 self.write_copy_trailer(&mut copy_in).await?;
143 copy_in
144 .finish()
145 .await
146 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148 Ok(())
149 }
150
151 pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157 if pools.is_empty() {
158 return Ok(());
159 }
160
161 let copy_statement = r"
162 COPY pool (
163 chain_id, dex_name, address, creation_block,
164 token0_chain, token0_address, token1_chain, token1_address,
165 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
166 ) FROM STDIN WITH (FORMAT BINARY)";
167
168 let mut copy_in = self
169 .pool
170 .copy_in_raw(copy_statement)
171 .await
172 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174 self.write_copy_header(&mut copy_in).await?;
175 for pool in pools {
176 self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177 }
178 self.write_copy_trailer(&mut copy_in).await?;
179 copy_in
180 .finish()
181 .await
182 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184 Ok(())
185 }
186
187 pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193 if swaps.is_empty() {
194 return Ok(());
195 }
196
197 let copy_statement = r"
198 COPY pool_swap_event (
199 chain_id, pool_address, block, transaction_hash, transaction_index,
200 log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
201 order_side, base_quantity, quote_quantity, spot_price, execution_price
202 ) FROM STDIN WITH (FORMAT BINARY)";
203
204 let mut copy_in = self
205 .pool
206 .copy_in_raw(copy_statement)
207 .await
208 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
209
210 self.write_copy_header(&mut copy_in).await?;
212
213 for swap in swaps {
215 self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
216 .await?;
217 }
218
219 self.write_copy_trailer(&mut copy_in).await?;
221
222 copy_in.finish().await.map_err(|e| {
224 tracing::error!("COPY operation failed for pool_swap batch:");
226 tracing::error!(" Chain ID: {}", chain_id);
227 tracing::error!(" Batch size: {}", swaps.len());
228
229 if !swaps.is_empty() {
230 tracing::error!(
231 " Block range: {} to {}",
232 swaps.iter().map(|s| s.block).min().unwrap_or(0),
233 swaps.iter().map(|s| s.block).max().unwrap_or(0)
234 );
235 }
236
237 for (i, swap) in swaps.iter().take(5).enumerate() {
239 tracing::error!(
240 " Swap[{}]: tx={} log_idx={} block={} pool={}",
241 i,
242 swap.transaction_hash,
243 swap.log_index,
244 swap.block,
245 swap.pool_address
246 );
247 }
248
249 if swaps.len() > 5 {
250 tracing::error!(" ... and {} more swaps", swaps.len() - 5);
251 }
252
253 anyhow::anyhow!("Failed to finish COPY operation: {e}")
254 })?;
255
256 Ok(())
257 }
258
259 pub async fn copy_pool_liquidity_updates(
265 &self,
266 chain_id: u32,
267 updates: &[PoolLiquidityUpdate],
268 ) -> anyhow::Result<()> {
269 if updates.is_empty() {
270 return Ok(());
271 }
272
273 let copy_statement = r"
274 COPY pool_liquidity_event (
275 chain_id, pool_address, block, transaction_hash, transaction_index,
276 log_index, event_type, sender, owner, position_liquidity,
277 amount0, amount1, tick_lower, tick_upper
278 ) FROM STDIN WITH (FORMAT BINARY)";
279
280 let mut copy_in = self
281 .pool
282 .copy_in_raw(copy_statement)
283 .await
284 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
285
286 self.write_copy_header(&mut copy_in).await?;
288
289 for update in updates {
291 self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
292 .await?;
293 }
294
295 self.write_copy_trailer(&mut copy_in).await?;
297
298 copy_in.finish().await.map_err(|e| {
300 tracing::error!("COPY operation failed for pool_liquidity batch:");
302 tracing::error!(" Chain ID: {}", chain_id);
303 tracing::error!(" Batch size: {}", updates.len());
304
305 if !updates.is_empty() {
306 tracing::error!(
307 " Block range: {} to {}",
308 updates.iter().map(|u| u.block).min().unwrap_or(0),
309 updates.iter().map(|u| u.block).max().unwrap_or(0)
310 );
311 }
312
313 for (i, update) in updates.iter().take(5).enumerate() {
315 tracing::error!(
316 " Update[{}]: tx={} log_idx={} block={} pool={} type={}",
317 i,
318 update.transaction_hash,
319 update.log_index,
320 update.block,
321 update.pool_address,
322 update.kind
323 );
324 }
325
326 if updates.len() > 5 {
327 tracing::error!(" ... and {} more updates", updates.len() - 5);
328 }
329
330 anyhow::anyhow!("Failed to finish COPY operation: {e}")
331 })?;
332
333 Ok(())
334 }
335
336 async fn write_copy_header(
343 &self,
344 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
345 ) -> anyhow::Result<()> {
346 use std::io::Write;
347 let mut header = Vec::new();
348
349 header.write_all(b"PGCOPY\n\xff\r\n\0")?; header.write_all(&[0, 0, 0, 0])?; header.write_all(&[0, 0, 0, 0])?; copy_in
355 .send(header)
356 .await
357 .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
358 Ok(())
359 }
360
361 async fn write_block_binary(
367 &self,
368 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
369 chain_id: u32,
370 block: &Block,
371 ) -> anyhow::Result<()> {
372 use std::io::Write;
373 let mut row_data = Vec::new();
374
375 row_data.write_all(&14u16.to_be_bytes())?;
377
378 let chain_id_bytes = (chain_id as i32).to_be_bytes();
380 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
381 row_data.write_all(&chain_id_bytes)?;
382
383 let number_bytes = (block.number as i64).to_be_bytes();
385 row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
386 row_data.write_all(&number_bytes)?;
387
388 let hash_bytes = block.hash.as_bytes();
390 row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
391 row_data.write_all(hash_bytes)?;
392
393 let parent_hash_bytes = block.parent_hash.as_bytes();
395 row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
396 row_data.write_all(parent_hash_bytes)?;
397
398 let miner_bytes = block.miner.to_string().as_bytes().to_vec();
400 row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
401 row_data.write_all(&miner_bytes)?;
402
403 let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
405 row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
406 row_data.write_all(&gas_limit_bytes)?;
407
408 let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
410 row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
411 row_data.write_all(&gas_used_bytes)?;
412
413 let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
415 row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
416 row_data.write_all(×tamp_bytes)?;
417
418 if let Some(ref base_fee) = block.base_fee_per_gas {
420 let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
421 row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
422 row_data.write_all(&base_fee_bytes)?;
423 } else {
424 row_data.write_all(&(-1i32).to_be_bytes())?; }
426
427 if let Some(ref blob_gas) = block.blob_gas_used {
429 let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
430 row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
431 row_data.write_all(&blob_gas_bytes)?;
432 } else {
433 row_data.write_all(&(-1i32).to_be_bytes())?; }
435
436 if let Some(ref excess_blob) = block.excess_blob_gas {
438 let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
439 row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
440 row_data.write_all(&excess_blob_bytes)?;
441 } else {
442 row_data.write_all(&(-1i32).to_be_bytes())?; }
444
445 if let Some(ref l1_gas_price) = block.l1_gas_price {
447 let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
448 row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
449 row_data.write_all(&l1_gas_price_bytes)?;
450 } else {
451 row_data.write_all(&(-1i32).to_be_bytes())?; }
453
454 if let Some(l1_gas_used) = block.l1_gas_used {
456 let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
457 row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
458 row_data.write_all(&l1_gas_used_bytes)?;
459 } else {
460 row_data.write_all(&(-1i32).to_be_bytes())?; }
462
463 if let Some(l1_fee_scalar) = block.l1_fee_scalar {
465 let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
466 row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
467 row_data.write_all(&l1_fee_scalar_bytes)?;
468 } else {
469 row_data.write_all(&(-1i32).to_be_bytes())?; }
471
472 copy_in
473 .send(row_data)
474 .await
475 .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
476 Ok(())
477 }
478
479 async fn write_pool_swap_binary(
485 &self,
486 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
487 chain_id: u32,
488 swap: &PoolSwap,
489 ) -> anyhow::Result<()> {
490 use std::io::Write;
491 let mut row_data = Vec::new();
492
493 row_data.write_all(&18u16.to_be_bytes())?;
497
498 let chain_id_bytes = (chain_id as i32).to_be_bytes();
500 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
501 row_data.write_all(&chain_id_bytes)?;
502
503 let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
505 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
506 row_data.write_all(&pool_address_bytes)?;
507
508 let block_bytes = (swap.block as i64).to_be_bytes();
510 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
511 row_data.write_all(&block_bytes)?;
512
513 let tx_hash_bytes = swap.transaction_hash.as_bytes();
515 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
516 row_data.write_all(tx_hash_bytes)?;
517
518 let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
520 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
521 row_data.write_all(&tx_index_bytes)?;
522
523 let log_index_bytes = (swap.log_index as i32).to_be_bytes();
525 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
526 row_data.write_all(&log_index_bytes)?;
527
528 let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
530 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
531 row_data.write_all(&sender_bytes)?;
532
533 let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
535 row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
536 row_data.write_all(&recipient_bytes)?;
537
538 let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
540 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
541 row_data.write_all(&sqrt_price_bytes)?;
542
543 let liquidity_bytes = format_numeric(&swap.liquidity).as_bytes().to_vec();
545 row_data.write_all(&(liquidity_bytes.len() as i32).to_be_bytes())?;
546 row_data.write_all(&liquidity_bytes)?;
547
548 let tick_bytes = swap.tick.to_be_bytes();
550 row_data.write_all(&(tick_bytes.len() as i32).to_be_bytes())?;
551 row_data.write_all(&tick_bytes)?;
552
553 let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
555 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
556 row_data.write_all(&amount0_bytes)?;
557
558 let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
560 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
561 row_data.write_all(&amount1_bytes)?;
562
563 if let Some(trade_info) = &swap.trade_info {
565 let side_bytes = trade_info.order_side.to_string().as_bytes().to_vec();
567 row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
568 row_data.write_all(&side_bytes)?;
569
570 let base_qty_decimal = trade_info.quantity_base.as_decimal();
572 let base_qty_str = base_qty_decimal.to_string();
573 let base_qty_bytes = base_qty_str.as_bytes();
574 row_data.write_all(&(base_qty_bytes.len() as i32).to_be_bytes())?;
575 row_data.write_all(base_qty_bytes)?;
576
577 let quote_qty_decimal = trade_info.quantity_quote.as_decimal();
579 let quote_qty_str = quote_qty_decimal.to_string();
580 let quote_qty_bytes = quote_qty_str.as_bytes();
581 row_data.write_all(&(quote_qty_bytes.len() as i32).to_be_bytes())?;
582 row_data.write_all(quote_qty_bytes)?;
583
584 let spot_price_decimal = trade_info.spot_price.as_decimal();
586 let spot_price_str = spot_price_decimal.to_string();
587 let spot_price_bytes = spot_price_str.as_bytes();
588 row_data.write_all(&(spot_price_bytes.len() as i32).to_be_bytes())?;
589 row_data.write_all(spot_price_bytes)?;
590
591 let exec_price_decimal = trade_info.execution_price.as_decimal();
593 let exec_price_str = exec_price_decimal.to_string();
594 let exec_price_bytes = exec_price_str.as_bytes();
595 row_data.write_all(&(exec_price_bytes.len() as i32).to_be_bytes())?;
596 row_data.write_all(exec_price_bytes)?;
597 } else {
598 row_data.write_all(&(-1i32).to_be_bytes())?; row_data.write_all(&(-1i32).to_be_bytes())?; row_data.write_all(&(-1i32).to_be_bytes())?; row_data.write_all(&(-1i32).to_be_bytes())?; row_data.write_all(&(-1i32).to_be_bytes())?; }
605
606 copy_in
607 .send(row_data)
608 .await
609 .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
610 Ok(())
611 }
612
613 async fn write_pool_liquidity_update_binary(
619 &self,
620 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
621 chain_id: u32,
622 update: &PoolLiquidityUpdate,
623 ) -> anyhow::Result<()> {
624 use std::io::Write;
625 let mut row_data = Vec::new();
626
627 row_data.write_all(&14u16.to_be_bytes())?;
629
630 let chain_id_bytes = (chain_id as i32).to_be_bytes();
632 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
633 row_data.write_all(&chain_id_bytes)?;
634
635 let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
637 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
638 row_data.write_all(&pool_address_bytes)?;
639
640 let block_bytes = (update.block as i64).to_be_bytes();
642 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
643 row_data.write_all(&block_bytes)?;
644
645 let tx_hash_bytes = update.transaction_hash.as_bytes();
647 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
648 row_data.write_all(tx_hash_bytes)?;
649
650 let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
652 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
653 row_data.write_all(&tx_index_bytes)?;
654
655 let log_index_bytes = (update.log_index as i32).to_be_bytes();
657 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
658 row_data.write_all(&log_index_bytes)?;
659
660 let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
662 row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
663 row_data.write_all(&event_type_bytes)?;
664
665 if let Some(sender) = update.sender {
667 let sender_bytes = sender.to_string().as_bytes().to_vec();
668 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
669 row_data.write_all(&sender_bytes)?;
670 } else {
671 row_data.write_all(&(-1i32).to_be_bytes())?; }
673
674 let owner_bytes = update.owner.to_string().as_bytes().to_vec();
676 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
677 row_data.write_all(&owner_bytes)?;
678
679 let position_liquidity_bytes = format_numeric(&update.position_liquidity)
681 .as_bytes()
682 .to_vec();
683 row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
684 row_data.write_all(&position_liquidity_bytes)?;
685
686 let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
688 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
689 row_data.write_all(&amount0_bytes)?;
690
691 let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
693 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
694 row_data.write_all(&amount1_bytes)?;
695
696 let tick_lower_bytes = update.tick_lower.to_be_bytes();
698 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
699 row_data.write_all(&tick_lower_bytes)?;
700
701 let tick_upper_bytes = update.tick_upper.to_be_bytes();
703 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
704 row_data.write_all(&tick_upper_bytes)?;
705
706 copy_in
707 .send(row_data)
708 .await
709 .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
710 Ok(())
711 }
712
713 pub async fn copy_pool_collects(
719 &self,
720 chain_id: u32,
721 collects: &[PoolFeeCollect],
722 ) -> anyhow::Result<()> {
723 if collects.is_empty() {
724 return Ok(());
725 }
726
727 let copy_statement = r"
728 COPY pool_collect_event (
729 chain_id, pool_address, block, transaction_hash, transaction_index,
730 log_index, owner, amount0, amount1, tick_lower, tick_upper
731 ) FROM STDIN WITH (FORMAT BINARY)";
732
733 let mut copy_in = self
734 .pool
735 .copy_in_raw(copy_statement)
736 .await
737 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
738
739 self.write_copy_header(&mut copy_in).await?;
741
742 for collect in collects {
744 self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
745 .await?;
746 }
747
748 self.write_copy_trailer(&mut copy_in).await?;
750
751 copy_in.finish().await.map_err(|e| {
753 tracing::error!("COPY operation failed for temp_pool_collect batch:");
755 tracing::error!(" Chain ID: {}", chain_id);
756 tracing::error!(" Batch size: {}", collects.len());
757
758 if !collects.is_empty() {
759 tracing::error!(
760 " Block range: {} to {}",
761 collects.iter().map(|c| c.block).min().unwrap_or(0),
762 collects.iter().map(|c| c.block).max().unwrap_or(0)
763 );
764 }
765
766 for (i, collect) in collects.iter().take(5).enumerate() {
768 tracing::error!(
769 " Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
770 i,
771 collect.transaction_hash,
772 collect.log_index,
773 collect.block,
774 collect.pool_address,
775 collect.owner
776 );
777 }
778
779 if collects.len() > 5 {
780 tracing::error!(" ... and {} more collects", collects.len() - 5);
781 }
782
783 anyhow::anyhow!("Failed to finish COPY operation: {e}")
784 })?;
785
786 Ok(())
787 }
788
789 async fn write_pool_fee_collect_binary(
795 &self,
796 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
797 chain_id: u32,
798 collect: &PoolFeeCollect,
799 ) -> anyhow::Result<()> {
800 use std::io::Write;
801 let mut row_data = Vec::new();
802
803 row_data.write_all(&11u16.to_be_bytes())?;
805
806 let chain_id_bytes = (chain_id as i32).to_be_bytes();
808 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
809 row_data.write_all(&chain_id_bytes)?;
810
811 let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
813 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
814 row_data.write_all(&pool_address_bytes)?;
815
816 let block_bytes = (collect.block as i64).to_be_bytes();
818 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
819 row_data.write_all(&block_bytes)?;
820
821 let tx_hash_bytes = collect.transaction_hash.as_bytes();
823 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
824 row_data.write_all(tx_hash_bytes)?;
825
826 let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
828 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
829 row_data.write_all(&tx_index_bytes)?;
830
831 let log_index_bytes = (collect.log_index as i32).to_be_bytes();
833 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
834 row_data.write_all(&log_index_bytes)?;
835
836 let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
838 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
839 row_data.write_all(&owner_bytes)?;
840
841 let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
843 row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
844 row_data.write_all(&fee0_bytes)?;
845
846 let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
848 row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
849 row_data.write_all(&fee1_bytes)?;
850
851 let tick_lower_bytes = collect.tick_lower.to_be_bytes();
853 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
854 row_data.write_all(&tick_lower_bytes)?;
855
856 let tick_upper_bytes = collect.tick_upper.to_be_bytes();
858 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
859 row_data.write_all(&tick_upper_bytes)?;
860
861 copy_in
862 .send(row_data)
863 .await
864 .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
865 Ok(())
866 }
867
868 async fn write_token_binary(
874 &self,
875 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
876 chain_id: u32,
877 token: &Token,
878 ) -> anyhow::Result<()> {
879 use std::io::Write;
880 let mut row_data = Vec::new();
881
882 row_data.write_all(&5u16.to_be_bytes())?;
884
885 let chain_id_bytes = (chain_id as i32).to_be_bytes();
887 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
888 row_data.write_all(&chain_id_bytes)?;
889
890 let address_bytes = token.address.to_string().as_bytes().to_vec();
892 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
893 row_data.write_all(&address_bytes)?;
894
895 let name_bytes = token.name.as_bytes();
897 row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
898 row_data.write_all(name_bytes)?;
899
900 let symbol_bytes = token.symbol.as_bytes();
902 row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
903 row_data.write_all(symbol_bytes)?;
904
905 let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
907 row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
908 row_data.write_all(&decimals_bytes)?;
909
910 copy_in
911 .send(row_data)
912 .await
913 .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
914 Ok(())
915 }
916
917 async fn write_pool_binary(
923 &self,
924 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
925 chain_id: u32,
926 pool: &Pool,
927 ) -> anyhow::Result<()> {
928 use std::io::Write;
929 let mut row_data = Vec::new();
930
931 row_data.write_all(&12u16.to_be_bytes())?;
933
934 let chain_id_bytes = (chain_id as i32).to_be_bytes();
936 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
937 row_data.write_all(&chain_id_bytes)?;
938
939 let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
941 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
942 row_data.write_all(&dex_name_bytes)?;
943
944 let address_bytes = pool.address.to_string().as_bytes().to_vec();
946 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
947 row_data.write_all(&address_bytes)?;
948
949 let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
951 row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
952 row_data.write_all(&creation_block_bytes)?;
953
954 let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
956 row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
957 row_data.write_all(&token0_chain_bytes)?;
958
959 let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
961 row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
962 row_data.write_all(&token0_address_bytes)?;
963
964 let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
966 row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
967 row_data.write_all(&token1_chain_bytes)?;
968
969 let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
971 row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
972 row_data.write_all(&token1_address_bytes)?;
973
974 if let Some(fee) = pool.fee {
976 let fee_bytes = (fee as i32).to_be_bytes();
977 row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
978 row_data.write_all(&fee_bytes)?;
979 } else {
980 row_data.write_all(&(-1i32).to_be_bytes())?; }
982
983 if let Some(tick_spacing) = pool.tick_spacing {
985 let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
986 row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
987 row_data.write_all(&tick_spacing_bytes)?;
988 } else {
989 row_data.write_all(&(-1i32).to_be_bytes())?; }
991
992 if let Some(initial_tick) = pool.initial_tick {
994 let initial_tick_bytes = initial_tick.to_be_bytes();
995 row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
996 row_data.write_all(&initial_tick_bytes)?;
997 } else {
998 row_data.write_all(&(-1i32).to_be_bytes())?; }
1000
1001 if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
1003 let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
1004 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
1005 row_data.write_all(&sqrt_price_bytes)?;
1006 } else {
1007 row_data.write_all(&(-1i32).to_be_bytes())?; }
1009
1010 copy_in
1011 .send(row_data)
1012 .await
1013 .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
1014 Ok(())
1015 }
1016
1017 async fn write_copy_trailer(
1021 &self,
1022 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
1023 ) -> anyhow::Result<()> {
1024 let trailer = (-1i16).to_be_bytes();
1026 copy_in
1027 .send(trailer.to_vec())
1028 .await
1029 .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
1030 Ok(())
1031 }
1032}