nautilus_blockchain/cache/
copy.rs1use nautilus_model::defi::{Block, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect};
22use sqlx::{PgPool, postgres::PgPoolCopyExt};
23
24fn format_scientific_to_decimal(s: &str) -> String {
26 use std::str::FromStr;
27
28 use rust_decimal::Decimal;
29
30 match Decimal::from_str(s) {
31 Ok(decimal) => decimal.to_string(),
32 Err(_) => s.to_string(), }
34}
35
36fn format_numeric<T: ToString>(value: &T) -> String {
38 let s = value.to_string();
39
40 let s = s.trim_start_matches('+');
42
43 if s.contains('e') || s.contains('E') {
45 return format_scientific_to_decimal(s);
46 }
47
48 s.to_string()
52}
53
54#[derive(Debug)]
56pub struct PostgresCopyHandler<'a> {
57 pool: &'a PgPool,
58}
59
60impl<'a> PostgresCopyHandler<'a> {
61 #[must_use]
63 pub const fn new(pool: &'a PgPool) -> Self {
64 Self { pool }
65 }
66
67 pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
76 if blocks.is_empty() {
77 return Ok(());
78 }
79
80 let copy_statement = r"
81 COPY block (
82 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
83 base_fee_per_gas, blob_gas_used, excess_blob_gas,
84 l1_gas_price, l1_gas_used, l1_fee_scalar
85 ) FROM STDIN WITH (FORMAT BINARY)";
86
87 let mut copy_in = self
88 .pool
89 .copy_in_raw(copy_statement)
90 .await
91 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
92
93 self.write_copy_header(&mut copy_in).await?;
95
96 for block in blocks {
98 self.write_block_binary(&mut copy_in, chain_id, block)
99 .await?;
100 }
101
102 self.write_copy_trailer(&mut copy_in).await?;
104
105 copy_in
107 .finish()
108 .await
109 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
110
111 Ok(())
112 }
113
114 pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
120 if swaps.is_empty() {
121 return Ok(());
122 }
123
124 let copy_statement = r"
125 COPY pool_swap_event (
126 chain_id, pool_address, block, transaction_hash, transaction_index,
127 log_index, sender, recipient, side, size, price, sqrt_price_x96, amount0, amount1
128 ) FROM STDIN WITH (FORMAT BINARY)";
129
130 let mut copy_in = self
131 .pool
132 .copy_in_raw(copy_statement)
133 .await
134 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
135
136 self.write_copy_header(&mut copy_in).await?;
138
139 for swap in swaps {
141 self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
142 .await?;
143 }
144
145 self.write_copy_trailer(&mut copy_in).await?;
147
148 copy_in.finish().await.map_err(|e| {
150 tracing::error!("COPY operation failed for pool_swap batch:");
152 tracing::error!(" Chain ID: {}", chain_id);
153 tracing::error!(" Batch size: {}", swaps.len());
154
155 if !swaps.is_empty() {
156 tracing::error!(
157 " Block range: {} to {}",
158 swaps.iter().map(|s| s.block).min().unwrap_or(0),
159 swaps.iter().map(|s| s.block).max().unwrap_or(0)
160 );
161 }
162
163 for (i, swap) in swaps.iter().take(5).enumerate() {
165 tracing::error!(
166 " Swap[{}]: tx={} log_idx={} block={} pool={}",
167 i,
168 swap.transaction_hash,
169 swap.log_index,
170 swap.block,
171 swap.pool_address
172 );
173 }
174
175 if swaps.len() > 5 {
176 tracing::error!(" ... and {} more swaps", swaps.len() - 5);
177 }
178
179 anyhow::anyhow!("Failed to finish COPY operation: {e}")
180 })?;
181
182 Ok(())
183 }
184
185 pub async fn copy_pool_liquidity_updates(
191 &self,
192 chain_id: u32,
193 updates: &[PoolLiquidityUpdate],
194 ) -> anyhow::Result<()> {
195 if updates.is_empty() {
196 return Ok(());
197 }
198
199 let copy_statement = r"
200 COPY pool_liquidity_event (
201 chain_id, pool_address, block, transaction_hash, transaction_index,
202 log_index, event_type, sender, owner, position_liquidity,
203 amount0, amount1, tick_lower, tick_upper
204 ) FROM STDIN WITH (FORMAT BINARY)";
205
206 let mut copy_in = self
207 .pool
208 .copy_in_raw(copy_statement)
209 .await
210 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
211
212 self.write_copy_header(&mut copy_in).await?;
214
215 for update in updates {
217 self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
218 .await?;
219 }
220
221 self.write_copy_trailer(&mut copy_in).await?;
223
224 copy_in.finish().await.map_err(|e| {
226 tracing::error!("COPY operation failed for pool_liquidity batch:");
228 tracing::error!(" Chain ID: {}", chain_id);
229 tracing::error!(" Batch size: {}", updates.len());
230
231 if !updates.is_empty() {
232 tracing::error!(
233 " Block range: {} to {}",
234 updates.iter().map(|u| u.block).min().unwrap_or(0),
235 updates.iter().map(|u| u.block).max().unwrap_or(0)
236 );
237 }
238
239 for (i, update) in updates.iter().take(5).enumerate() {
241 tracing::error!(
242 " Update[{}]: tx={} log_idx={} block={} pool={} type={}",
243 i,
244 update.transaction_hash,
245 update.log_index,
246 update.block,
247 update.pool_address,
248 update.kind
249 );
250 }
251
252 if updates.len() > 5 {
253 tracing::error!(" ... and {} more updates", updates.len() - 5);
254 }
255
256 anyhow::anyhow!("Failed to finish COPY operation: {e}")
257 })?;
258
259 Ok(())
260 }
261
262 async fn write_copy_header(
269 &self,
270 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
271 ) -> anyhow::Result<()> {
272 use std::io::Write;
273 let mut header = Vec::new();
274
275 header.write_all(b"PGCOPY\n\xff\r\n\0")?; header.write_all(&[0, 0, 0, 0])?; header.write_all(&[0, 0, 0, 0])?; copy_in
281 .send(header)
282 .await
283 .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
284 Ok(())
285 }
286
287 async fn write_block_binary(
293 &self,
294 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
295 chain_id: u32,
296 block: &Block,
297 ) -> anyhow::Result<()> {
298 use std::io::Write;
299 let mut row_data = Vec::new();
300
301 row_data.write_all(&14u16.to_be_bytes())?;
303
304 let chain_id_bytes = (chain_id as i32).to_be_bytes();
306 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
307 row_data.write_all(&chain_id_bytes)?;
308
309 let number_bytes = (block.number as i64).to_be_bytes();
311 row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
312 row_data.write_all(&number_bytes)?;
313
314 let hash_bytes = block.hash.as_bytes();
316 row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
317 row_data.write_all(hash_bytes)?;
318
319 let parent_hash_bytes = block.parent_hash.as_bytes();
321 row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
322 row_data.write_all(parent_hash_bytes)?;
323
324 let miner_bytes = block.miner.to_string().as_bytes().to_vec();
326 row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
327 row_data.write_all(&miner_bytes)?;
328
329 let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
331 row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
332 row_data.write_all(&gas_limit_bytes)?;
333
334 let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
336 row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
337 row_data.write_all(&gas_used_bytes)?;
338
339 let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
341 row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
342 row_data.write_all(×tamp_bytes)?;
343
344 if let Some(ref base_fee) = block.base_fee_per_gas {
346 let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
347 row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
348 row_data.write_all(&base_fee_bytes)?;
349 } else {
350 row_data.write_all(&(-1i32).to_be_bytes())?; }
352
353 if let Some(ref blob_gas) = block.blob_gas_used {
355 let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
356 row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
357 row_data.write_all(&blob_gas_bytes)?;
358 } else {
359 row_data.write_all(&(-1i32).to_be_bytes())?; }
361
362 if let Some(ref excess_blob) = block.excess_blob_gas {
364 let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
365 row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
366 row_data.write_all(&excess_blob_bytes)?;
367 } else {
368 row_data.write_all(&(-1i32).to_be_bytes())?; }
370
371 if let Some(ref l1_gas_price) = block.l1_gas_price {
373 let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
374 row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
375 row_data.write_all(&l1_gas_price_bytes)?;
376 } else {
377 row_data.write_all(&(-1i32).to_be_bytes())?; }
379
380 if let Some(l1_gas_used) = block.l1_gas_used {
382 let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
383 row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
384 row_data.write_all(&l1_gas_used_bytes)?;
385 } else {
386 row_data.write_all(&(-1i32).to_be_bytes())?; }
388
389 if let Some(l1_fee_scalar) = block.l1_fee_scalar {
391 let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
392 row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
393 row_data.write_all(&l1_fee_scalar_bytes)?;
394 } else {
395 row_data.write_all(&(-1i32).to_be_bytes())?; }
397
398 copy_in
399 .send(row_data)
400 .await
401 .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
402 Ok(())
403 }
404
405 async fn write_pool_swap_binary(
411 &self,
412 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
413 chain_id: u32,
414 swap: &PoolSwap,
415 ) -> anyhow::Result<()> {
416 use std::io::Write;
417 let mut row_data = Vec::new();
418
419 row_data.write_all(&14u16.to_be_bytes())?;
421
422 let chain_id_bytes = (chain_id as i32).to_be_bytes();
424 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
425 row_data.write_all(&chain_id_bytes)?;
426
427 let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
429 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
430 row_data.write_all(&pool_address_bytes)?;
431
432 let block_bytes = (swap.block as i64).to_be_bytes();
434 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
435 row_data.write_all(&block_bytes)?;
436
437 let tx_hash_bytes = swap.transaction_hash.as_bytes();
439 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
440 row_data.write_all(tx_hash_bytes)?;
441
442 let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
444 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
445 row_data.write_all(&tx_index_bytes)?;
446
447 let log_index_bytes = (swap.log_index as i32).to_be_bytes();
449 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
450 row_data.write_all(&log_index_bytes)?;
451
452 let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
454 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
455 row_data.write_all(&sender_bytes)?;
456
457 let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
459 row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
460 row_data.write_all(&recipient_bytes)?;
461
462 if let Some(side) = swap.side {
464 let side_bytes = side.to_string().as_bytes().to_vec();
465 row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
466 row_data.write_all(&side_bytes)?;
467 } else {
468 row_data.write_all(&(-1i32).to_be_bytes())?; }
470
471 if let Some(size) = swap.size {
473 let size_bytes = size.to_string().as_bytes().to_vec();
474 row_data.write_all(&(size_bytes.len() as i32).to_be_bytes())?;
475 row_data.write_all(&size_bytes)?;
476 } else {
477 row_data.write_all(&(-1i32).to_be_bytes())?; }
479
480 if let Some(price) = swap.price {
482 let price_bytes = price.to_string().as_bytes().to_vec();
483 row_data.write_all(&(price_bytes.len() as i32).to_be_bytes())?;
484 row_data.write_all(&price_bytes)?;
485 } else {
486 row_data.write_all(&(-1i32).to_be_bytes())?; }
488
489 let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
491 row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
492 row_data.write_all(&sqrt_price_bytes)?;
493
494 let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
496 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
497 row_data.write_all(&amount0_bytes)?;
498
499 let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
501 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
502 row_data.write_all(&amount1_bytes)?;
503
504 copy_in
505 .send(row_data)
506 .await
507 .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
508 Ok(())
509 }
510
511 async fn write_pool_liquidity_update_binary(
517 &self,
518 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
519 chain_id: u32,
520 update: &PoolLiquidityUpdate,
521 ) -> anyhow::Result<()> {
522 use std::io::Write;
523 let mut row_data = Vec::new();
524
525 row_data.write_all(&14u16.to_be_bytes())?;
527
528 let chain_id_bytes = (chain_id as i32).to_be_bytes();
530 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
531 row_data.write_all(&chain_id_bytes)?;
532
533 let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
535 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
536 row_data.write_all(&pool_address_bytes)?;
537
538 let block_bytes = (update.block as i64).to_be_bytes();
540 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
541 row_data.write_all(&block_bytes)?;
542
543 let tx_hash_bytes = update.transaction_hash.as_bytes();
545 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
546 row_data.write_all(tx_hash_bytes)?;
547
548 let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
550 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
551 row_data.write_all(&tx_index_bytes)?;
552
553 let log_index_bytes = (update.log_index as i32).to_be_bytes();
555 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
556 row_data.write_all(&log_index_bytes)?;
557
558 let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
560 row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
561 row_data.write_all(&event_type_bytes)?;
562
563 if let Some(sender) = update.sender {
565 let sender_bytes = sender.to_string().as_bytes().to_vec();
566 row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
567 row_data.write_all(&sender_bytes)?;
568 } else {
569 row_data.write_all(&(-1i32).to_be_bytes())?; }
571
572 let owner_bytes = update.owner.to_string().as_bytes().to_vec();
574 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
575 row_data.write_all(&owner_bytes)?;
576
577 let position_liquidity_bytes = format_numeric(&update.position_liquidity)
579 .as_bytes()
580 .to_vec();
581 row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
582 row_data.write_all(&position_liquidity_bytes)?;
583
584 let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
586 row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
587 row_data.write_all(&amount0_bytes)?;
588
589 let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
591 row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
592 row_data.write_all(&amount1_bytes)?;
593
594 let tick_lower_bytes = update.tick_lower.to_be_bytes();
596 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
597 row_data.write_all(&tick_lower_bytes)?;
598
599 let tick_upper_bytes = update.tick_upper.to_be_bytes();
601 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
602 row_data.write_all(&tick_upper_bytes)?;
603
604 copy_in
605 .send(row_data)
606 .await
607 .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
608 Ok(())
609 }
610
611 pub async fn copy_pool_collects(
617 &self,
618 chain_id: u32,
619 collects: &[PoolFeeCollect],
620 ) -> anyhow::Result<()> {
621 if collects.is_empty() {
622 return Ok(());
623 }
624
625 let copy_statement = r"
626 COPY pool_collect_event (
627 chain_id, pool_address, block, transaction_hash, transaction_index,
628 log_index, owner, amount0, amount1, tick_lower, tick_upper
629 ) FROM STDIN WITH (FORMAT BINARY)";
630
631 let mut copy_in = self
632 .pool
633 .copy_in_raw(copy_statement)
634 .await
635 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
636
637 self.write_copy_header(&mut copy_in).await?;
639
640 for collect in collects {
642 self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
643 .await?;
644 }
645
646 self.write_copy_trailer(&mut copy_in).await?;
648
649 copy_in.finish().await.map_err(|e| {
651 tracing::error!("COPY operation failed for temp_pool_collect batch:");
653 tracing::error!(" Chain ID: {}", chain_id);
654 tracing::error!(" Batch size: {}", collects.len());
655
656 if !collects.is_empty() {
657 tracing::error!(
658 " Block range: {} to {}",
659 collects.iter().map(|c| c.block).min().unwrap_or(0),
660 collects.iter().map(|c| c.block).max().unwrap_or(0)
661 );
662 }
663
664 for (i, collect) in collects.iter().take(5).enumerate() {
666 tracing::error!(
667 " Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
668 i,
669 collect.transaction_hash,
670 collect.log_index,
671 collect.block,
672 collect.pool_address,
673 collect.owner
674 );
675 }
676
677 if collects.len() > 5 {
678 tracing::error!(" ... and {} more collects", collects.len() - 5);
679 }
680
681 anyhow::anyhow!("Failed to finish COPY operation: {e}")
682 })?;
683
684 Ok(())
685 }
686
687 async fn write_pool_fee_collect_binary(
693 &self,
694 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
695 chain_id: u32,
696 collect: &PoolFeeCollect,
697 ) -> anyhow::Result<()> {
698 use std::io::Write;
699 let mut row_data = Vec::new();
700
701 row_data.write_all(&11u16.to_be_bytes())?;
703
704 let chain_id_bytes = (chain_id as i32).to_be_bytes();
706 row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
707 row_data.write_all(&chain_id_bytes)?;
708
709 let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
711 row_data.write_all(&(pool_address_bytes.len() as i32).to_be_bytes())?;
712 row_data.write_all(&pool_address_bytes)?;
713
714 let block_bytes = (collect.block as i64).to_be_bytes();
716 row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
717 row_data.write_all(&block_bytes)?;
718
719 let tx_hash_bytes = collect.transaction_hash.as_bytes();
721 row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
722 row_data.write_all(tx_hash_bytes)?;
723
724 let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
726 row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
727 row_data.write_all(&tx_index_bytes)?;
728
729 let log_index_bytes = (collect.log_index as i32).to_be_bytes();
731 row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
732 row_data.write_all(&log_index_bytes)?;
733
734 let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
736 row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
737 row_data.write_all(&owner_bytes)?;
738
739 let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
741 row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
742 row_data.write_all(&fee0_bytes)?;
743
744 let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
746 row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
747 row_data.write_all(&fee1_bytes)?;
748
749 let tick_lower_bytes = collect.tick_lower.to_be_bytes();
751 row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
752 row_data.write_all(&tick_lower_bytes)?;
753
754 let tick_upper_bytes = collect.tick_upper.to_be_bytes();
756 row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
757 row_data.write_all(&tick_upper_bytes)?;
758
759 copy_in
760 .send(row_data)
761 .await
762 .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
763 Ok(())
764 }
765
766 async fn write_copy_trailer(
770 &self,
771 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
772 ) -> anyhow::Result<()> {
773 let trailer = (-1i16).to_be_bytes();
775 copy_in
776 .send(trailer.to_vec())
777 .await
778 .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
779 Ok(())
780 }
781}