nautilus_blockchain/cache/
copy.rs1use nautilus_model::defi::{Block, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect};
22use sqlx::{PgPool, postgres::PgPoolCopyExt};
23
24#[derive(Debug)]
26pub struct PostgresCopyHandler<'a> {
27 pool: &'a PgPool,
28}
29
30impl<'a> PostgresCopyHandler<'a> {
31 #[must_use]
33 pub const fn new(pool: &'a PgPool) -> Self {
34 Self { pool }
35 }
36
37 pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
46 if blocks.is_empty() {
47 return Ok(());
48 }
49
50 let copy_statement = r"
51 COPY block (
52 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
53 base_fee_per_gas, blob_gas_used, excess_blob_gas,
54 l1_gas_price, l1_gas_used, l1_fee_scalar
55 ) FROM STDIN WITH (FORMAT BINARY)";
56
57 let mut copy_in = self
58 .pool
59 .copy_in_raw(copy_statement)
60 .await
61 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
62
63 self.write_copy_header(&mut copy_in).await?;
65
66 for block in blocks {
68 self.write_block_binary(&mut copy_in, chain_id, block)
69 .await?;
70 }
71
72 self.write_copy_trailer(&mut copy_in).await?;
74
75 copy_in
77 .finish()
78 .await
79 .map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
80
81 Ok(())
82 }
83
84 pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
90 if swaps.is_empty() {
91 return Ok(());
92 }
93
94 let copy_statement = r"
95 COPY pool_swap_event (
96 chain_id, pool_address, block, transaction_hash, transaction_index,
97 log_index, sender, side, size, price
98 ) FROM STDIN WITH (FORMAT BINARY)";
99
100 let mut copy_in = self
101 .pool
102 .copy_in_raw(copy_statement)
103 .await
104 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
105
106 self.write_copy_header(&mut copy_in).await?;
108
109 for swap in swaps {
111 self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
112 .await?;
113 }
114
115 self.write_copy_trailer(&mut copy_in).await?;
117
118 copy_in.finish().await.map_err(|e| {
120 tracing::error!("COPY operation failed for pool_swap batch:");
122 tracing::error!(" Chain ID: {}", chain_id);
123 tracing::error!(" Batch size: {}", swaps.len());
124
125 if !swaps.is_empty() {
126 tracing::error!(
127 " Block range: {} to {}",
128 swaps.iter().map(|s| s.block).min().unwrap_or(0),
129 swaps.iter().map(|s| s.block).max().unwrap_or(0)
130 );
131 }
132
133 for (i, swap) in swaps.iter().take(5).enumerate() {
135 tracing::error!(
136 " Swap[{}]: tx={} log_idx={} block={} pool={}",
137 i,
138 swap.transaction_hash,
139 swap.log_index,
140 swap.block,
141 swap.pool_address
142 );
143 }
144
145 if swaps.len() > 5 {
146 tracing::error!(" ... and {} more swaps", swaps.len() - 5);
147 }
148
149 anyhow::anyhow!("Failed to finish COPY operation: {e}")
150 })?;
151
152 Ok(())
153 }
154
155 pub async fn copy_pool_liquidity_updates(
161 &self,
162 chain_id: u32,
163 updates: &[PoolLiquidityUpdate],
164 ) -> anyhow::Result<()> {
165 if updates.is_empty() {
166 return Ok(());
167 }
168
169 let copy_statement = r"
170 COPY pool_liquidity_event (
171 chain_id, pool_address, block, transaction_hash, transaction_index,
172 log_index, event_type, sender, owner, position_liquidity,
173 amount0, amount1, tick_lower, tick_upper
174 ) FROM STDIN WITH (FORMAT BINARY)";
175
176 let mut copy_in = self
177 .pool
178 .copy_in_raw(copy_statement)
179 .await
180 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
181
182 self.write_copy_header(&mut copy_in).await?;
184
185 for update in updates {
187 self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
188 .await?;
189 }
190
191 self.write_copy_trailer(&mut copy_in).await?;
193
194 copy_in.finish().await.map_err(|e| {
196 tracing::error!("COPY operation failed for pool_liquidity batch:");
198 tracing::error!(" Chain ID: {}", chain_id);
199 tracing::error!(" Batch size: {}", updates.len());
200
201 if !updates.is_empty() {
202 tracing::error!(
203 " Block range: {} to {}",
204 updates.iter().map(|u| u.block).min().unwrap_or(0),
205 updates.iter().map(|u| u.block).max().unwrap_or(0)
206 );
207 }
208
209 for (i, update) in updates.iter().take(5).enumerate() {
211 tracing::error!(
212 " Update[{}]: tx={} log_idx={} block={} pool={} type={}",
213 i,
214 update.transaction_hash,
215 update.log_index,
216 update.block,
217 update.pool_address,
218 update.kind
219 );
220 }
221
222 if updates.len() > 5 {
223 tracing::error!(" ... and {} more updates", updates.len() - 5);
224 }
225
226 anyhow::anyhow!("Failed to finish COPY operation: {e}")
227 })?;
228
229 Ok(())
230 }
231
232 async fn write_copy_header(
239 &self,
240 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
241 ) -> anyhow::Result<()> {
242 use std::io::Write;
243 let mut header = Vec::new();
244
245 header.write_all(b"PGCOPY\n\xff\r\n\0").unwrap(); header.write_all(&[0, 0, 0, 0]).unwrap(); header.write_all(&[0, 0, 0, 0]).unwrap(); copy_in
251 .send(header)
252 .await
253 .map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
254 Ok(())
255 }
256
257 async fn write_block_binary(
263 &self,
264 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
265 chain_id: u32,
266 block: &Block,
267 ) -> anyhow::Result<()> {
268 use std::io::Write;
269 let mut row_data = Vec::new();
270
271 row_data.write_all(&14u16.to_be_bytes()).unwrap();
273
274 let chain_id_bytes = (chain_id as i32).to_be_bytes();
276 row_data
277 .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
278 .unwrap();
279 row_data.write_all(&chain_id_bytes).unwrap();
280
281 let number_bytes = (block.number as i64).to_be_bytes();
283 row_data
284 .write_all(&(number_bytes.len() as i32).to_be_bytes())
285 .unwrap();
286 row_data.write_all(&number_bytes).unwrap();
287
288 let hash_bytes = block.hash.as_bytes();
290 row_data
291 .write_all(&(hash_bytes.len() as i32).to_be_bytes())
292 .unwrap();
293 row_data.write_all(hash_bytes).unwrap();
294
295 let parent_hash_bytes = block.parent_hash.as_bytes();
297 row_data
298 .write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())
299 .unwrap();
300 row_data.write_all(parent_hash_bytes).unwrap();
301
302 let miner_bytes = block.miner.to_string().as_bytes().to_vec();
304 row_data
305 .write_all(&(miner_bytes.len() as i32).to_be_bytes())
306 .unwrap();
307 row_data.write_all(&miner_bytes).unwrap();
308
309 let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
311 row_data
312 .write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())
313 .unwrap();
314 row_data.write_all(&gas_limit_bytes).unwrap();
315
316 let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
318 row_data
319 .write_all(&(gas_used_bytes.len() as i32).to_be_bytes())
320 .unwrap();
321 row_data.write_all(&gas_used_bytes).unwrap();
322
323 let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
325 row_data
326 .write_all(&(timestamp_bytes.len() as i32).to_be_bytes())
327 .unwrap();
328 row_data.write_all(×tamp_bytes).unwrap();
329
330 if let Some(ref base_fee) = block.base_fee_per_gas {
332 let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
333 row_data
334 .write_all(&(base_fee_bytes.len() as i32).to_be_bytes())
335 .unwrap();
336 row_data.write_all(&base_fee_bytes).unwrap();
337 } else {
338 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
340
341 if let Some(ref blob_gas) = block.blob_gas_used {
343 let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
344 row_data
345 .write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())
346 .unwrap();
347 row_data.write_all(&blob_gas_bytes).unwrap();
348 } else {
349 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
351
352 if let Some(ref excess_blob) = block.excess_blob_gas {
354 let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
355 row_data
356 .write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())
357 .unwrap();
358 row_data.write_all(&excess_blob_bytes).unwrap();
359 } else {
360 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
362
363 if let Some(ref l1_gas_price) = block.l1_gas_price {
365 let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
366 row_data
367 .write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())
368 .unwrap();
369 row_data.write_all(&l1_gas_price_bytes).unwrap();
370 } else {
371 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
373
374 if let Some(l1_gas_used) = block.l1_gas_used {
376 let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
377 row_data
378 .write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())
379 .unwrap();
380 row_data.write_all(&l1_gas_used_bytes).unwrap();
381 } else {
382 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
384
385 if let Some(l1_fee_scalar) = block.l1_fee_scalar {
387 let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
388 row_data
389 .write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())
390 .unwrap();
391 row_data.write_all(&l1_fee_scalar_bytes).unwrap();
392 } else {
393 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
395
396 copy_in
397 .send(row_data)
398 .await
399 .map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
400 Ok(())
401 }
402
403 async fn write_pool_swap_binary(
409 &self,
410 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
411 chain_id: u32,
412 swap: &PoolSwap,
413 ) -> anyhow::Result<()> {
414 use std::io::Write;
415 let mut row_data = Vec::new();
416
417 row_data.write_all(&10u16.to_be_bytes()).unwrap();
419
420 let chain_id_bytes = (chain_id as i32).to_be_bytes();
422 row_data
423 .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
424 .unwrap();
425 row_data.write_all(&chain_id_bytes).unwrap();
426
427 let pool_address_bytes = swap.pool_address.to_string().as_bytes().to_vec();
429 row_data
430 .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
431 .unwrap();
432 row_data.write_all(&pool_address_bytes).unwrap();
433
434 let block_bytes = (swap.block as i64).to_be_bytes();
436 row_data
437 .write_all(&(block_bytes.len() as i32).to_be_bytes())
438 .unwrap();
439 row_data.write_all(&block_bytes).unwrap();
440
441 let tx_hash_bytes = swap.transaction_hash.as_bytes();
443 row_data
444 .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
445 .unwrap();
446 row_data.write_all(tx_hash_bytes).unwrap();
447
448 let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
450 row_data
451 .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
452 .unwrap();
453 row_data.write_all(&tx_index_bytes).unwrap();
454
455 let log_index_bytes = (swap.log_index as i32).to_be_bytes();
457 row_data
458 .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
459 .unwrap();
460 row_data.write_all(&log_index_bytes).unwrap();
461
462 let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
464 row_data
465 .write_all(&(sender_bytes.len() as i32).to_be_bytes())
466 .unwrap();
467 row_data.write_all(&sender_bytes).unwrap();
468
469 let side_bytes = swap.side.to_string().as_bytes().to_vec();
471 row_data
472 .write_all(&(side_bytes.len() as i32).to_be_bytes())
473 .unwrap();
474 row_data.write_all(&side_bytes).unwrap();
475
476 let size_bytes = swap.size.to_string().as_bytes().to_vec();
478 row_data
479 .write_all(&(size_bytes.len() as i32).to_be_bytes())
480 .unwrap();
481 row_data.write_all(&size_bytes).unwrap();
482
483 let price_bytes = swap.price.to_string().as_bytes().to_vec();
485 row_data
486 .write_all(&(price_bytes.len() as i32).to_be_bytes())
487 .unwrap();
488 row_data.write_all(&price_bytes).unwrap();
489
490 copy_in
491 .send(row_data)
492 .await
493 .map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
494 Ok(())
495 }
496
497 async fn write_pool_liquidity_update_binary(
503 &self,
504 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
505 chain_id: u32,
506 update: &PoolLiquidityUpdate,
507 ) -> anyhow::Result<()> {
508 use std::io::Write;
509 let mut row_data = Vec::new();
510
511 row_data.write_all(&14u16.to_be_bytes()).unwrap();
513
514 let chain_id_bytes = (chain_id as i32).to_be_bytes();
516 row_data
517 .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
518 .unwrap();
519 row_data.write_all(&chain_id_bytes).unwrap();
520
521 let pool_address_bytes = update.pool_address.to_string().as_bytes().to_vec();
523 row_data
524 .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
525 .unwrap();
526 row_data.write_all(&pool_address_bytes).unwrap();
527
528 let block_bytes = (update.block as i64).to_be_bytes();
530 row_data
531 .write_all(&(block_bytes.len() as i32).to_be_bytes())
532 .unwrap();
533 row_data.write_all(&block_bytes).unwrap();
534
535 let tx_hash_bytes = update.transaction_hash.as_bytes();
537 row_data
538 .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
539 .unwrap();
540 row_data.write_all(tx_hash_bytes).unwrap();
541
542 let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
544 row_data
545 .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
546 .unwrap();
547 row_data.write_all(&tx_index_bytes).unwrap();
548
549 let log_index_bytes = (update.log_index as i32).to_be_bytes();
551 row_data
552 .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
553 .unwrap();
554 row_data.write_all(&log_index_bytes).unwrap();
555
556 let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
558 row_data
559 .write_all(&(event_type_bytes.len() as i32).to_be_bytes())
560 .unwrap();
561 row_data.write_all(&event_type_bytes).unwrap();
562
563 if let Some(sender) = update.sender {
565 let sender_bytes = sender.to_string().as_bytes().to_vec();
566 row_data
567 .write_all(&(sender_bytes.len() as i32).to_be_bytes())
568 .unwrap();
569 row_data.write_all(&sender_bytes).unwrap();
570 } else {
571 row_data.write_all(&(-1i32).to_be_bytes()).unwrap(); }
573
574 let owner_bytes = update.owner.to_string().as_bytes().to_vec();
576 row_data
577 .write_all(&(owner_bytes.len() as i32).to_be_bytes())
578 .unwrap();
579 row_data.write_all(&owner_bytes).unwrap();
580
581 let position_liquidity_bytes = update.position_liquidity.to_string().as_bytes().to_vec();
583 row_data
584 .write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())
585 .unwrap();
586 row_data.write_all(&position_liquidity_bytes).unwrap();
587
588 let amount0_bytes = update.amount0.to_string().as_bytes().to_vec();
590 row_data
591 .write_all(&(amount0_bytes.len() as i32).to_be_bytes())
592 .unwrap();
593 row_data.write_all(&amount0_bytes).unwrap();
594
595 let amount1_bytes = update.amount1.to_string().as_bytes().to_vec();
597 row_data
598 .write_all(&(amount1_bytes.len() as i32).to_be_bytes())
599 .unwrap();
600 row_data.write_all(&amount1_bytes).unwrap();
601
602 let tick_lower_bytes = update.tick_lower.to_be_bytes();
604 row_data
605 .write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())
606 .unwrap();
607 row_data.write_all(&tick_lower_bytes).unwrap();
608
609 let tick_upper_bytes = update.tick_upper.to_be_bytes();
611 row_data
612 .write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())
613 .unwrap();
614 row_data.write_all(&tick_upper_bytes).unwrap();
615
616 copy_in
617 .send(row_data)
618 .await
619 .map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
620 Ok(())
621 }
622
623 pub async fn copy_pool_collects(
629 &self,
630 chain_id: u32,
631 collects: &[PoolFeeCollect],
632 ) -> anyhow::Result<()> {
633 if collects.is_empty() {
634 return Ok(());
635 }
636
637 let copy_statement = r"
638 COPY pool_collect_event (
639 chain_id, pool_address, block, transaction_hash, transaction_index,
640 log_index, owner, fee0, fee1, tick_lower, tick_upper
641 ) FROM STDIN WITH (FORMAT BINARY)";
642
643 let mut copy_in = self
644 .pool
645 .copy_in_raw(copy_statement)
646 .await
647 .map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
648
649 self.write_copy_header(&mut copy_in).await?;
651
652 for collect in collects {
654 self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
655 .await?;
656 }
657
658 self.write_copy_trailer(&mut copy_in).await?;
660
661 copy_in.finish().await.map_err(|e| {
663 tracing::error!("COPY operation failed for pool_fee_collect batch:");
665 tracing::error!(" Chain ID: {}", chain_id);
666 tracing::error!(" Batch size: {}", collects.len());
667
668 if !collects.is_empty() {
669 tracing::error!(
670 " Block range: {} to {}",
671 collects.iter().map(|c| c.block).min().unwrap_or(0),
672 collects.iter().map(|c| c.block).max().unwrap_or(0)
673 );
674 }
675
676 for (i, collect) in collects.iter().take(5).enumerate() {
678 tracing::error!(
679 " Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
680 i,
681 collect.transaction_hash,
682 collect.log_index,
683 collect.block,
684 collect.pool_address,
685 collect.owner
686 );
687 }
688
689 if collects.len() > 5 {
690 tracing::error!(" ... and {} more collects", collects.len() - 5);
691 }
692
693 anyhow::anyhow!("Failed to finish COPY operation: {e}")
694 })?;
695
696 Ok(())
697 }
698
699 async fn write_pool_fee_collect_binary(
705 &self,
706 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
707 chain_id: u32,
708 collect: &PoolFeeCollect,
709 ) -> anyhow::Result<()> {
710 use std::io::Write;
711 let mut row_data = Vec::new();
712
713 row_data.write_all(&11u16.to_be_bytes()).unwrap();
715
716 let chain_id_bytes = (chain_id as i32).to_be_bytes();
718 row_data
719 .write_all(&(chain_id_bytes.len() as i32).to_be_bytes())
720 .unwrap();
721 row_data.write_all(&chain_id_bytes).unwrap();
722
723 let pool_address_bytes = collect.pool_address.to_string().as_bytes().to_vec();
725 row_data
726 .write_all(&(pool_address_bytes.len() as i32).to_be_bytes())
727 .unwrap();
728 row_data.write_all(&pool_address_bytes).unwrap();
729
730 let block_bytes = (collect.block as i64).to_be_bytes();
732 row_data
733 .write_all(&(block_bytes.len() as i32).to_be_bytes())
734 .unwrap();
735 row_data.write_all(&block_bytes).unwrap();
736
737 let tx_hash_bytes = collect.transaction_hash.as_bytes();
739 row_data
740 .write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())
741 .unwrap();
742 row_data.write_all(tx_hash_bytes).unwrap();
743
744 let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
746 row_data
747 .write_all(&(tx_index_bytes.len() as i32).to_be_bytes())
748 .unwrap();
749 row_data.write_all(&tx_index_bytes).unwrap();
750
751 let log_index_bytes = (collect.log_index as i32).to_be_bytes();
753 row_data
754 .write_all(&(log_index_bytes.len() as i32).to_be_bytes())
755 .unwrap();
756 row_data.write_all(&log_index_bytes).unwrap();
757
758 let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
760 row_data
761 .write_all(&(owner_bytes.len() as i32).to_be_bytes())
762 .unwrap();
763 row_data.write_all(&owner_bytes).unwrap();
764
765 let fee0_bytes = collect.fee0.to_string().as_bytes().to_vec();
767 row_data
768 .write_all(&(fee0_bytes.len() as i32).to_be_bytes())
769 .unwrap();
770 row_data.write_all(&fee0_bytes).unwrap();
771
772 let fee1_bytes = collect.fee1.to_string().as_bytes().to_vec();
774 row_data
775 .write_all(&(fee1_bytes.len() as i32).to_be_bytes())
776 .unwrap();
777 row_data.write_all(&fee1_bytes).unwrap();
778
779 let tick_lower_bytes = collect.tick_lower.to_be_bytes();
781 row_data
782 .write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())
783 .unwrap();
784 row_data.write_all(&tick_lower_bytes).unwrap();
785
786 let tick_upper_bytes = collect.tick_upper.to_be_bytes();
788 row_data
789 .write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())
790 .unwrap();
791 row_data.write_all(&tick_upper_bytes).unwrap();
792
793 copy_in
794 .send(row_data)
795 .await
796 .map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
797 Ok(())
798 }
799
800 async fn write_copy_trailer(
804 &self,
805 copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
806 ) -> anyhow::Result<()> {
807 let trailer = (-1i16).to_be_bytes();
809 copy_in
810 .send(trailer.to_vec())
811 .await
812 .map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
813 Ok(())
814 }
815}