1use nautilus_model::defi::{
22 Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
23};
24use sqlx::{PgPool, postgres::PgPoolCopyExt};
25
26fn format_scientific_to_decimal(s: &str) -> String {
28 use std::str::FromStr;
29
30 use rust_decimal::Decimal;
31
32 match Decimal::from_str(s) {
33 Ok(decimal) => decimal.to_string(),
34 Err(_) => s.to_string(), }
36}
37
38fn format_numeric<T: ToString>(value: &T) -> String {
40 let s = value.to_string();
41
42 let s = s.trim_start_matches('+');
44
45 if s.contains('e') || s.contains('E') {
47 return format_scientific_to_decimal(s);
48 }
49
50 s.to_string()
54}
55
56#[derive(Debug)]
58pub struct PostgresCopyHandler<'a> {
59 pool: &'a PgPool,
60}
61
62impl<'a> PostgresCopyHandler<'a> {
63 #[must_use]
65 pub const fn new(pool: &'a PgPool) -> Self {
66 Self { pool }
67 }
68
69 pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
78 if blocks.is_empty() {
79 return Ok(());
80 }
81
82 let copy_statement = r"
83 COPY block (
84 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
85 base_fee_per_gas, blob_gas_used, excess_blob_gas,
86 l1_gas_price, l1_gas_used, l1_fee_scalar
87 ) FROM STDIN WITH (FORMAT BINARY)";
88
89 let mut copy_in = self
90 .pool
91 .copy_in_raw(copy_statement)
92 .await
93 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
94
95 self.write_copy_header(&mut copy_in).await?;
97
98 for block in blocks {
100 self.write_block_binary(&mut copy_in, chain_id, block)
101 .await?;
102 }
103
104 self.write_copy_trailer(&mut copy_in).await?;
106
107 copy_in
109 .finish()
110 .await
111 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
112
113 Ok(())
114 }
115
116 pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
122 if tokens.is_empty() {
123 return Ok(());
124 }
125
126 let copy_statement = r"
127 COPY token (
128 chain_id, address, name, symbol, decimals
129 ) FROM STDIN WITH (FORMAT BINARY)";
130
131 let mut copy_in = self
132 .pool
133 .copy_in_raw(copy_statement)
134 .await
135 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
136
137 self.write_copy_header(&mut copy_in).await?;
138 for token in tokens {
139 self.write_token_binary(&mut copy_in, chain_id, token)
140 .await?;
141 }
142 self.write_copy_trailer(&mut copy_in).await?;
143 copy_in
144 .finish()
145 .await
146 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
147
148 Ok(())
149 }
150
151 pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
157 if pools.is_empty() {
158 return Ok(());
159 }
160
161 let copy_statement = r"
162 COPY pool (
163 chain_id, dex_name, address, creation_block,
164 token0_chain, token0_address, token1_chain, token1_address,
165 fee, tick_spacing, initial_tick, initial_sqrt_price_x96
166 ) FROM STDIN WITH (FORMAT BINARY)";
167
168 let mut copy_in = self
169 .pool
170 .copy_in_raw(copy_statement)
171 .await
172 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
173
174 self.write_copy_header(&mut copy_in).await?;
175 for pool in pools {
176 self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
177 }
178 self.write_copy_trailer(&mut copy_in).await?;
179 copy_in
180 .finish()
181 .await
182 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
183
184 Ok(())
185 }
186
187 pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
193 if swaps.is_empty() {
194 return Ok(());
195 }
196
197 let copy_statement = r"
198 COPY pool_swap_event (
199 chain_id, pool_address, block, transaction_hash, transaction_index,
200 log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
201 ) FROM STDIN WITH (FORMAT BINARY)";
202
203 let mut copy_in = self
204 .pool
205 .copy_in_raw(copy_statement)
206 .await
207 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
208
209 self.write_copy_header(&mut copy_in).await?;
211
212 for swap in swaps {
214 self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
215 .await?;
216 }
217
218 self.write_copy_trailer(&mut copy_in).await?;
220
221 copy_in.finish().await.map_err(|e| {
223 tracing::error!("COPY operation failed for pool_swap batch:");
225 tracing::error!(" Chain ID: {}", chain_id);
226 tracing::error!(" Batch size: {}", swaps.len());
227
228 if !swaps.is_empty() {
229 tracing::error!(
230 " Block range: {} to {}",
231 swaps.iter().map(|s| s.block).min().unwrap_or(0),
232 swaps.iter().map(|s| s.block).max().unwrap_or(0)
233 );
234 }
235
236 for (i, swap) in swaps.iter().take(5).enumerate() {
238 tracing::error!(
239 " Swap[{}]: tx={} log_idx={} block={} pool={}",
240 i,
241 swap.transaction_hash,
242 swap.log_index,
243 swap.block,
244 swap.pool_address
245 );
246 }
247
248 if swaps.len() > 5 {
249 tracing::error!(" ... and {} more swaps", swaps.len() - 5);
250 }
251
252 anyhow::anyhow!("Failed to finish COPY operation: {e}")
253 })?;
254
255 Ok(())
256 }
257
258 pub async fn copy_pool_liquidity_updates(
264 &self,
265 chain_id: u32,
266 updates: &[PoolLiquidityUpdate],
267 ) -> anyhow::Result<()> {
268 if updates.is_empty() {
269 return Ok(());
270 }
271
272 let copy_statement = r"
273 COPY pool_liquidity_event (
274 chain_id, pool_address, block, transaction_hash, transaction_index,
275 log_index, event_type, sender, owner, position_liquidity,
276 amount0, amount1, tick_lower, tick_upper
277 ) FROM STDIN WITH (FORMAT BINARY)";
278
279 let mut copy_in = self
280 .pool
281 .copy_in_raw(copy_statement)
282 .await
283 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
284
285 self.write_copy_header(&mut copy_in).await?;
287
288 for update in updates {
290 self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
291 .await?;
292 }
293
294 self.write_copy_trailer(&mut copy_in).await?;
296
297 copy_in.finish().await.map_err(|e| {
299 tracing::error!("COPY operation failed for pool_liquidity batch:");
301 tracing::error!(" Chain ID: {}", chain_id);
302 tracing::error!(" Batch size: {}", updates.len());
303
304 if !updates.is_empty() {
305 tracing::error!(
306 " Block range: {} to {}",
307 updates.iter().map(|u| u.block).min().unwrap_or(0),
308 updates.iter().map(|u| u.block).max().unwrap_or(0)
309 );
310 }
311
312 for (i, update) in updates.iter().take(5).enumerate() {
314 tracing::error!(
315 " Update[{}]: tx={} log_idx={} block={} pool={} type={}",
316 i,
317 update.transaction_hash,
318 update.log_index,
319 update.block,
320 update.pool_address,
321 update.kind
322 );
323 }
324
325 if updates.len() > 5 {
326 tracing::error!(" ... and {} more updates", updates.len() - 5);
327 }
328
329 anyhow::anyhow!("Failed to finish COPY operation: {e}")
330 })?;
331
332 Ok(())
333 }
334
335 async fn write_copy_header(
342 &self,
343 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
344 ) -> anyhow::Result<()> {
345 use std::io::Write;
346 let mut header = Vec::new();
347
348 header.write_all(b"PGCOPY\n\xff\r\n\0")?; header.write_all(&[0, 0, 0, 0])?; header.write_all(&[0, 0, 0, 0])?; copy_in
354 .send(header)
355 .await
356 .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
357 Ok(())
358 }
359
360 async fn write_block_binary(
366 &self,
367 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
368 chain_id: u32,
369 block: &Block,
370 ) -> anyhow::Result<()> {
371 use std::io::Write;
372 let mut row_data = Vec::new();
373
374 row_data.write_all(&14u16.to_be_bytes())?;
376
377 let chain_id_bytes = (chain_id as i32).to_be_bytes();
379 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
380 row_data.write_all(&chain_id_bytes)?;
381
382 let number_bytes = (block.number as i64).to_be_bytes();
384 row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
385 row_data.write_all(&number_bytes)?;
386
387 let hash_bytes = block.hash.as_bytes();
389 row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
390 row_data.write_all(hash_bytes)?;
391
392 let parent_hash_bytes = block.parent_hash.as_bytes();
394 row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
395 row_data.write_all(parent_hash_bytes)?;
396
397 let miner_bytes = block.miner.to_string().as_bytes().to_vec();
399 row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
400 row_data.write_all(&miner_bytes)?;
401
402 let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
404 row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
405 row_data.write_all(&gas_limit_bytes)?;
406
407 let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
409 row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
410 row_data.write_all(&gas_used_bytes)?;
411
412 let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
414 row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
415 row_data.write_all(×tamp_bytes)?;
416
417 if let Some(ref base_fee) = block.base_fee_per_gas {
419 let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
420 row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
421 row_data.write_all(&base_fee_bytes)?;
422 } else {
423 row_data.write_all(&(-1i32).to_be_bytes())?; }
425
426 if let Some(ref blob_gas) = block.blob_gas_used {
428 let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
429 row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
430 row_data.write_all(&blob_gas_bytes)?;
431 } else {
432 row_data.write_all(&(-1i32).to_be_bytes())?; }
434
435 if let Some(ref excess_blob) = block.excess_blob_gas {
437 let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
438 row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
439 row_data.write_all(&excess_blob_bytes)?;
440 } else {
441 row_data.write_all(&(-1i32).to_be_bytes())?; }
443
444 if let Some(ref l1_gas_price) = block.l1_gas_price {
446 let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
447 row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
448 row_data.write_all(&l1_gas_price_bytes)?;
449 } else {
450 row_data.write_all(&(-1i32).to_be_bytes())?; }
452
453 if let Some(l1_gas_used) = block.l1_gas_used {
455 let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
456 row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
457 row_data.write_all(&l1_gas_used_bytes)?;
458 } else {
459 row_data.write_all(&(-1i32).to_be_bytes())?; }
461
462 if let Some(l1_fee_scalar) = block.l1_fee_scalar {
464 let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
465 row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
466 row_data.write_all(&l1_fee_scalar_bytes)?;
467 } else {
468 row_data.write_all(&(-1i32).to_be_bytes())?; }
470
471 copy_in
472 .send(row_data)
473 .await
474 .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
475 Ok(())
476 }
477
478 async fn write_pool_swap_binary(
484 &self,
485 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
486 chain_id: u32,
487 swap: &PoolSwap,
488 ) -> anyhow::Result<()> {
489 use std::io::Write;
490 let mut row_data = Vec::new();
491
492 row_data.write_all(&14u16.to_be_bytes())?;
494
495 let chain_id_bytes = (chain_id as i32).to_be_bytes();
497 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
498 row_data.write_all(&chain_id_bytes)?;
499
500 let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
502 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
503 row_data.write_all(&pool_address_bytes)?;
504
505 let block_bytes = (swap.block as i64).to_be_bytes();
507 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
508 row_data.write_all(&block_bytes)?;
509
510 let tx_hash_bytes = swap.transaction_hash.as_bytes();
512 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
513 row_data.write_all(tx_hash_bytes)?;
514
515 let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
517 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
518 row_data.write_all(&tx_index_bytes)?;
519
520 let log_index_bytes = (swap.log_index as i32).to_be_bytes();
522 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
523 row_data.write_all(&log_index_bytes)?;
524
525 let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
527 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
528 row_data.write_all(&sender_bytes)?;
529
530 let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
532 row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
533 row_data.write_all(&recipient_bytes)?;
534
535 if let Some(side) = swap.side {
537 let side_bytes = side.to_string().as_bytes().to_vec();
538 row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
539 row_data.write_all(&side_bytes)?;
540 } else {
541 row_data.write_all(&(-1i32).to_be_bytes())?; }
543
544 if let Some(size) = swap.size {
546 let size_bytes = size.to_string().as_bytes().to_vec();
547 row_data.write_all(&(size_bytes.len() as i32).to_be_bytes())?;
548 row_data.write_all(&size_bytes)?;
549 } else {
550 row_data.write_all(&(-1i32).to_be_bytes())?; }
552
553 if let Some(price) = swap.price {
555 let price_bytes = price.to_string().as_bytes().to_vec();
556 row_data.write_all(&(price_bytes.len() as i32).to_be_bytes())?;
557 row_data.write_all(&price_bytes)?;
558 } else {
559 row_data.write_all(&(-1i32).to_be_bytes())?; }
561
562 let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
564 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
565 row_data.write_all(&sqrt_price_bytes)?;
566
567 let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
569 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
570 row_data.write_all(&amount0_bytes)?;
571
572 let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
574 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
575 row_data.write_all(&amount1_bytes)?;
576
577 copy_in
578 .send(row_data)
579 .await
580 .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
581 Ok(())
582 }
583
584 async fn write_pool_liquidity_update_binary(
590 &self,
591 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
592 chain_id: u32,
593 update: &PoolLiquidityUpdate,
594 ) -> anyhow::Result<()> {
595 use std::io::Write;
596 let mut row_data = Vec::new();
597
598 row_data.write_all(&14u16.to_be_bytes())?;
600
601 let chain_id_bytes = (chain_id as i32).to_be_bytes();
603 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
604 row_data.write_all(&chain_id_bytes)?;
605
606 let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
608 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
609 row_data.write_all(&pool_address_bytes)?;
610
611 let block_bytes = (update.block as i64).to_be_bytes();
613 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
614 row_data.write_all(&block_bytes)?;
615
616 let tx_hash_bytes = update.transaction_hash.as_bytes();
618 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
619 row_data.write_all(tx_hash_bytes)?;
620
621 let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
623 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
624 row_data.write_all(&tx_index_bytes)?;
625
626 let log_index_bytes = (update.log_index as i32).to_be_bytes();
628 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
629 row_data.write_all(&log_index_bytes)?;
630
631 let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
633 row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
634 row_data.write_all(&event_type_bytes)?;
635
636 if let Some(sender) = update.sender {
638 let sender_bytes = sender.to_string().as_bytes().to_vec();
639 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
640 row_data.write_all(&sender_bytes)?;
641 } else {
642 row_data.write_all(&(-1i32).to_be_bytes())?; }
644
645 let owner_bytes = update.owner.to_string().as_bytes().to_vec();
647 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
648 row_data.write_all(&owner_bytes)?;
649
650 let position_liquidity_bytes = format_numeric(&update.position_liquidity)
652 .as_bytes()
653 .to_vec();
654 row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
655 row_data.write_all(&position_liquidity_bytes)?;
656
657 let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
659 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
660 row_data.write_all(&amount0_bytes)?;
661
662 let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
664 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
665 row_data.write_all(&amount1_bytes)?;
666
667 let tick_lower_bytes = update.tick_lower.to_be_bytes();
669 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
670 row_data.write_all(&tick_lower_bytes)?;
671
672 let tick_upper_bytes = update.tick_upper.to_be_bytes();
674 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
675 row_data.write_all(&tick_upper_bytes)?;
676
677 copy_in
678 .send(row_data)
679 .await
680 .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
681 Ok(())
682 }
683
684 pub async fn copy_pool_collects(
690 &self,
691 chain_id: u32,
692 collects: &[PoolFeeCollect],
693 ) -> anyhow::Result<()> {
694 if collects.is_empty() {
695 return Ok(());
696 }
697
698 let copy_statement = r"
699 COPY pool_collect_event (
700 chain_id, pool_address, block, transaction_hash, transaction_index,
701 log_index, owner, amount0, amount1, tick_lower, tick_upper
702 ) FROM STDIN WITH (FORMAT BINARY)";
703
704 let mut copy_in = self
705 .pool
706 .copy_in_raw(copy_statement)
707 .await
708 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
709
710 self.write_copy_header(&mut copy_in).await?;
712
713 for collect in collects {
715 self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
716 .await?;
717 }
718
719 self.write_copy_trailer(&mut copy_in).await?;
721
722 copy_in.finish().await.map_err(|e| {
724 tracing::error!("COPY operation failed for temp_pool_collect batch:");
726 tracing::error!(" Chain ID: {}", chain_id);
727 tracing::error!(" Batch size: {}", collects.len());
728
729 if !collects.is_empty() {
730 tracing::error!(
731 " Block range: {} to {}",
732 collects.iter().map(|c| c.block).min().unwrap_or(0),
733 collects.iter().map(|c| c.block).max().unwrap_or(0)
734 );
735 }
736
737 for (i, collect) in collects.iter().take(5).enumerate() {
739 tracing::error!(
740 " Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
741 i,
742 collect.transaction_hash,
743 collect.log_index,
744 collect.block,
745 collect.pool_address,
746 collect.owner
747 );
748 }
749
750 if collects.len() > 5 {
751 tracing::error!(" ... and {} more collects", collects.len() - 5);
752 }
753
754 anyhow::anyhow!("Failed to finish COPY operation: {e}")
755 })?;
756
757 Ok(())
758 }
759
760 async fn write_pool_fee_collect_binary(
766 &self,
767 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
768 chain_id: u32,
769 collect: &PoolFeeCollect,
770 ) -> anyhow::Result<()> {
771 use std::io::Write;
772 let mut row_data = Vec::new();
773
774 row_data.write_all(&11u16.to_be_bytes())?;
776
777 let chain_id_bytes = (chain_id as i32).to_be_bytes();
779 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
780 row_data.write_all(&chain_id_bytes)?;
781
782 let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
784 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
785 row_data.write_all(&pool_address_bytes)?;
786
787 let block_bytes = (collect.block as i64).to_be_bytes();
789 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
790 row_data.write_all(&block_bytes)?;
791
792 let tx_hash_bytes = collect.transaction_hash.as_bytes();
794 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
795 row_data.write_all(tx_hash_bytes)?;
796
797 let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
799 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
800 row_data.write_all(&tx_index_bytes)?;
801
802 let log_index_bytes = (collect.log_index as i32).to_be_bytes();
804 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
805 row_data.write_all(&log_index_bytes)?;
806
807 let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
809 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
810 row_data.write_all(&owner_bytes)?;
811
812 let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
814 row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
815 row_data.write_all(&fee0_bytes)?;
816
817 let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
819 row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
820 row_data.write_all(&fee1_bytes)?;
821
822 let tick_lower_bytes = collect.tick_lower.to_be_bytes();
824 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
825 row_data.write_all(&tick_lower_bytes)?;
826
827 let tick_upper_bytes = collect.tick_upper.to_be_bytes();
829 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
830 row_data.write_all(&tick_upper_bytes)?;
831
832 copy_in
833 .send(row_data)
834 .await
835 .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
836 Ok(())
837 }
838
839 async fn write_token_binary(
845 &self,
846 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
847 chain_id: u32,
848 token: &Token,
849 ) -> anyhow::Result<()> {
850 use std::io::Write;
851 let mut row_data = Vec::new();
852
853 row_data.write_all(&5u16.to_be_bytes())?;
855
856 let chain_id_bytes = (chain_id as i32).to_be_bytes();
858 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
859 row_data.write_all(&chain_id_bytes)?;
860
861 let address_bytes = token.address.to_string().as_bytes().to_vec();
863 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
864 row_data.write_all(&address_bytes)?;
865
866 let name_bytes = token.name.as_bytes();
868 row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
869 row_data.write_all(name_bytes)?;
870
871 let symbol_bytes = token.symbol.as_bytes();
873 row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
874 row_data.write_all(symbol_bytes)?;
875
876 let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
878 row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
879 row_data.write_all(&decimals_bytes)?;
880
881 copy_in
882 .send(row_data)
883 .await
884 .map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
885 Ok(())
886 }
887
888 async fn write_pool_binary(
894 &self,
895 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
896 chain_id: u32,
897 pool: &Pool,
898 ) -> anyhow::Result<()> {
899 use std::io::Write;
900 let mut row_data = Vec::new();
901
902 row_data.write_all(&12u16.to_be_bytes())?;
904
905 let chain_id_bytes = (chain_id as i32).to_be_bytes();
907 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
908 row_data.write_all(&chain_id_bytes)?;
909
910 let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
912 row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
913 row_data.write_all(&dex_name_bytes)?;
914
915 let address_bytes = pool.address.to_string().as_bytes().to_vec();
917 row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
918 row_data.write_all(&address_bytes)?;
919
920 let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
922 row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
923 row_data.write_all(&creation_block_bytes)?;
924
925 let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
927 row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
928 row_data.write_all(&token0_chain_bytes)?;
929
930 let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
932 row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
933 row_data.write_all(&token0_address_bytes)?;
934
935 let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
937 row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
938 row_data.write_all(&token1_chain_bytes)?;
939
940 let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
942 row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
943 row_data.write_all(&token1_address_bytes)?;
944
945 if let Some(fee) = pool.fee {
947 let fee_bytes = (fee as i32).to_be_bytes();
948 row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
949 row_data.write_all(&fee_bytes)?;
950 } else {
951 row_data.write_all(&(-1i32).to_be_bytes())?; }
953
954 if let Some(tick_spacing) = pool.tick_spacing {
956 let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
957 row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
958 row_data.write_all(&tick_spacing_bytes)?;
959 } else {
960 row_data.write_all(&(-1i32).to_be_bytes())?; }
962
963 if let Some(initial_tick) = pool.initial_tick {
965 let initial_tick_bytes = initial_tick.to_be_bytes();
966 row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
967 row_data.write_all(&initial_tick_bytes)?;
968 } else {
969 row_data.write_all(&(-1i32).to_be_bytes())?; }
971
972 if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
974 let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
975 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
976 row_data.write_all(&sqrt_price_bytes)?;
977 } else {
978 row_data.write_all(&(-1i32).to_be_bytes())?; }
980
981 copy_in
982 .send(row_data)
983 .await
984 .map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
985 Ok(())
986 }
987
988 async fn write_copy_trailer(
992 &self,
993 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
994 ) -> anyhow::Result<()> {
995 let trailer = (-1i16).to_be_bytes();
997 copy_in
998 .send(trailer.to_vec())
999 .await
1000 .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
1001 Ok(())
1002 }
1003}