1use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23 Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
24 SharedPool, Token,
25 data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash},
26};
27
28use crate::{
29 cache::BlockchainCache,
30 config::BlockchainDataClientConfig,
31 contracts::erc20::{Erc20Contract, TokenInfoError},
32 data::subscription::DefiDataSubscriptionManager,
33 events::{
34 burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent,
35 pool_created::PoolCreatedEvent, swap::SwapEvent,
36 },
37 exchanges::{extended::DexExtended, get_dex_extended},
38 hypersync::{
39 client::HyperSyncClient,
40 helpers::{extract_block_number, extract_event_signature_bytes},
41 },
42 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
43 rpc::{
44 BlockchainRpcClient, BlockchainRpcClientAny,
45 chains::{
46 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47 polygon::PolygonRpcClient,
48 },
49 http::BlockchainHttpRpcClient,
50 types::BlockchainMessage,
51 },
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62 pub chain: SharedChain,
64 pub config: BlockchainDataClientConfig,
66 pub cache: BlockchainCache,
68 tokens: Erc20Contract,
70 pub hypersync_client: HyperSyncClient,
72 pub rpc_client: Option<BlockchainRpcClientAny>,
74 pub subscription_manager: DefiDataSubscriptionManager,
76 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
78}
79
80impl BlockchainDataClientCore {
81 #[must_use]
87 pub fn new(
88 config: BlockchainDataClientConfig,
89 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
90 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
91 ) -> Self {
92 let chain = config.chain.clone();
93 let cache = BlockchainCache::new(chain.clone());
94 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
95 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
96 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
97 } else {
98 None
99 };
100 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101 config.http_rpc_url.clone(),
102 config.rpc_requests_per_second,
103 ));
104 let erc20_contract = Erc20Contract::new(
105 http_rpc_client,
106 config.pool_filters.remove_pools_with_empty_erc20fields,
107 );
108
109 let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
110 Self {
111 chain,
112 config,
113 rpc_client,
114 tokens: erc20_contract,
115 cache,
116 hypersync_client,
117 subscription_manager: DefiDataSubscriptionManager::new(),
118 data_tx,
119 }
120 }
121
122 pub async fn initialize_cache_database(&mut self) {
124 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
125 tracing::info!(
126 "Initializing blockchain cache on database '{}'",
127 pg_connect_options.database
128 );
129 self.cache
130 .initialize_database(pg_connect_options.clone().into())
131 .await;
132 }
133 }
134
135 fn initialize_rpc_client(
137 blockchain: Blockchain,
138 wss_rpc_url: String,
139 ) -> BlockchainRpcClientAny {
140 match blockchain {
141 Blockchain::Ethereum => {
142 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
143 }
144 Blockchain::Polygon => {
145 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
146 }
147 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
148 Blockchain::Arbitrum => {
149 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
150 }
151 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
152 }
153 }
154
155 pub async fn connect(&mut self) -> anyhow::Result<()> {
161 tracing::info!(
162 "Connecting blockchain data client for '{}'",
163 self.chain.name
164 );
165 self.initialize_cache_database().await;
166
167 if let Some(ref mut rpc_client) = self.rpc_client {
168 rpc_client.connect().await?;
169 }
170
171 let from_block = self.determine_from_block();
172
173 tracing::info!(
174 "Connecting to blockchain data source for '{chain_name}' from block {from_block}",
175 chain_name = self.chain.name
176 );
177
178 self.cache.initialize_chain().await;
180 self.cache.connect(from_block).await?;
182 for dex in self.config.dex_ids.clone() {
186 self.register_dex_exchange(dex).await?;
187 self.sync_exchange_pools(&dex, from_block, None, false)
188 .await?;
189 }
190
191 Ok(())
192 }
193
194 pub async fn sync_blocks_checked(
200 &mut self,
201 from_block: u64,
202 to_block: Option<u64>,
203 ) -> anyhow::Result<()> {
204 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
205 if blocks_status.is_consistent() {
207 tracing::info!(
208 "Cache is consistent: no gaps detected (last continuous block: {})",
209 blocks_status.last_continuous_block
210 );
211 let target_block = max(blocks_status.max_block + 1, from_block);
212 tracing::info!("Starting fast sync with COPY from block {}", target_block);
213 self.sync_blocks(target_block, to_block, true).await?;
214 } else {
215 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
216 tracing::info!(
217 "Cache inconsistency detected: {} blocks missing between {} and {}",
218 gap_size,
219 blocks_status.last_continuous_block + 1,
220 blocks_status.max_block
221 );
222
223 tracing::info!(
224 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
225 blocks_status.last_continuous_block + 1,
226 blocks_status.max_block
227 );
228 self.sync_blocks(
229 blocks_status.last_continuous_block + 1,
230 Some(blocks_status.max_block),
231 false,
232 )
233 .await?;
234
235 tracing::info!(
236 "Block syncing Phase 2: Continuing with fast COPY from block {}",
237 blocks_status.max_block + 1
238 );
239 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
240 .await?;
241 }
242 } else {
243 self.sync_blocks(from_block, to_block, true).await?;
244 }
245
246 Ok(())
247 }
248
249 pub async fn sync_blocks(
255 &mut self,
256 from_block: u64,
257 to_block: Option<u64>,
258 use_copy_command: bool,
259 ) -> anyhow::Result<()> {
260 let to_block = if let Some(block) = to_block {
261 block
262 } else {
263 self.hypersync_client.current_block().await
264 };
265 let total_blocks = to_block.saturating_sub(from_block) + 1;
266 tracing::info!(
267 "Syncing blocks from {from_block} to {to_block} (total: {total_blocks} blocks)"
268 );
269
270 if let Err(e) = self.cache.toggle_performance_settings(true).await {
272 tracing::warn!("Failed to enable performance settings: {e}");
273 }
274
275 let blocks_stream = self
276 .hypersync_client
277 .request_blocks_stream(from_block, Some(to_block))
278 .await;
279
280 tokio::pin!(blocks_stream);
281
282 let mut metrics = BlockchainSyncReporter::new(
283 BlockchainSyncReportItems::Blocks,
284 from_block,
285 total_blocks,
286 BLOCKS_PROCESS_IN_SYNC_REPORT,
287 );
288
289 const BATCH_SIZE: usize = 1000;
291 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
292
293 while let Some(block) = blocks_stream.next().await {
294 let block_number = block.number;
295 if self.cache.get_block_timestamp(block_number).is_some() {
296 continue;
297 }
298 batch.push(block);
299
300 if batch.len() >= BATCH_SIZE || block_number >= to_block {
302 let batch_size = batch.len();
303
304 self.cache.add_blocks_batch(batch, use_copy_command).await?;
305 metrics.update(batch_size);
306
307 batch = Vec::with_capacity(BATCH_SIZE);
309 }
310
311 if metrics.should_log_progress(block_number, to_block) {
313 metrics.log_progress(block_number);
314 }
315 }
316
317 if !batch.is_empty() {
319 let batch_size = batch.len();
320 self.cache.add_blocks_batch(batch, use_copy_command).await?;
321 metrics.update(batch_size);
322 }
323
324 metrics.log_final_stats();
325
326 if let Err(e) = self.cache.toggle_performance_settings(false).await {
328 tracing::warn!("Failed to restore default settings: {e}");
329 }
330
331 Ok(())
332 }
333
334 pub async fn sync_pool_events(
340 &mut self,
341 dex: &DexType,
342 pool_address: Address,
343 from_block: Option<u64>,
344 to_block: Option<u64>,
345 reset: bool,
346 ) -> anyhow::Result<()> {
347 let pool: SharedPool = self.get_pool(&pool_address)?.clone();
348 let pool_display = pool.to_full_spec_string();
349 let from_block = from_block.unwrap_or(pool.creation_block);
350
351 let (last_synced_block, effective_from_block) = if reset {
352 (None, from_block)
353 } else {
354 let last_synced_block = self
355 .cache
356 .get_pool_last_synced_block(dex, &pool_address)
357 .await?;
358 let effective_from_block = last_synced_block
359 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
360 (last_synced_block, effective_from_block)
361 };
362
363 let to_block = match to_block {
364 Some(block) => block,
365 None => self.hypersync_client.current_block().await,
366 };
367
368 if effective_from_block > to_block {
370 tracing::info!(
371 "D {} already synced to block {} (current: {}), skipping sync",
372 dex,
373 last_synced_block.unwrap_or(0),
374 to_block
375 );
376 return Ok(());
377 }
378
379 let last_block_across_pool_events_table = self
381 .cache
382 .get_pool_event_tables_last_block(&pool_address)
383 .await?;
384
385 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
386 tracing::info!(
387 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
388 pool_display,
389 effective_from_block,
390 to_block,
391 total_blocks,
392 if let Some(last_synced) = last_synced_block {
393 format!(" - resuming from last synced block {}", last_synced)
394 } else {
395 String::new()
396 }
397 );
398
399 let mut metrics = BlockchainSyncReporter::new(
400 BlockchainSyncReportItems::PoolEvents,
401 effective_from_block,
402 total_blocks,
403 BLOCKS_PROCESS_IN_SYNC_REPORT,
404 );
405 let dex_extended = self.get_dex_extended(dex)?.clone();
406 let swap_event_signature = dex_extended.swap_created_event.as_ref();
407 let mint_event_signature = dex_extended.mint_created_event.as_ref();
408 let burn_event_signature = dex_extended.burn_created_event.as_ref();
409 let collect_event_signature = dex_extended.collect_created_event.as_ref();
410 let flash_event_signature = dex_extended.flash_created_event.as_ref();
411 let initialize_event_signature: Option<&str> =
412 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
413
414 let swap_sig_bytes = hex::decode(
416 swap_event_signature
417 .strip_prefix("0x")
418 .unwrap_or(swap_event_signature),
419 )?;
420 let mint_sig_bytes = hex::decode(
421 mint_event_signature
422 .strip_prefix("0x")
423 .unwrap_or(mint_event_signature),
424 )?;
425 let burn_sig_bytes = hex::decode(
426 burn_event_signature
427 .strip_prefix("0x")
428 .unwrap_or(burn_event_signature),
429 )?;
430 let collect_sig_bytes = hex::decode(
431 collect_event_signature
432 .strip_prefix("0x")
433 .unwrap_or(collect_event_signature),
434 )?;
435 let flash_sig_bytes = flash_event_signature
436 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
437 let initialize_sig_bytes = initialize_event_signature
438 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
439
440 let mut event_signatures = vec![
441 swap_event_signature,
442 mint_event_signature,
443 burn_event_signature,
444 collect_event_signature,
445 ];
446 if let Some(event) = dex_extended.initialize_event.as_ref() {
447 event_signatures.push(event);
448 }
449 if let Some(event) = dex_extended.flash_created_event.as_ref() {
450 event_signatures.push(event);
451 }
452 let pool_events_stream = self
453 .hypersync_client
454 .request_contract_events_stream(
455 effective_from_block,
456 Some(to_block),
457 &pool_address,
458 event_signatures,
459 )
460 .await;
461 tokio::pin!(pool_events_stream);
462
463 let mut last_block_saved = effective_from_block;
464 let mut blocks_processed = 0;
465
466 const EVENT_BATCH_SIZE: usize = 20000;
468 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
469 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
470 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
471 let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
472
473 let mut beyond_stale_data = last_block_across_pool_events_table
475 .map_or(true, |tables_max| effective_from_block > tables_max);
476 while let Some(log) = pool_events_stream.next().await {
477 let block_number = extract_block_number(&log)?;
478 blocks_processed += block_number - last_block_saved;
479 last_block_saved = block_number;
480
481 let event_sig_bytes = extract_event_signature_bytes(&log)?;
482 if event_sig_bytes == swap_sig_bytes.as_slice() {
483 let swap_event = dex_extended.parse_swap_event(log)?;
484 match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
485 Ok(swap) => swap_batch.push(swap),
486 Err(e) => tracing::error!("Failed to process swap event: {e}"),
487 }
488 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
489 let mint_event = dex_extended.parse_mint_event(log)?;
490 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
491 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
492 Err(e) => tracing::error!("Failed to process mint event: {e}"),
493 }
494 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
495 let burn_event = dex_extended.parse_burn_event(log)?;
496 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
497 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
498 Err(e) => tracing::error!("Failed to process burn event: {e}"),
499 }
500 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
501 let collect_event = dex_extended.parse_collect_event(log)?;
502 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
503 Ok(fee_collect) => collect_batch.push(fee_collect),
504 Err(e) => tracing::error!("Failed to process collect event: {e}"),
505 }
506 } else if let Some(flash_sig_bytes_inner) = &flash_sig_bytes {
507 if event_sig_bytes == flash_sig_bytes_inner.as_slice() {
508 if let Some(parse_fn) = dex_extended.parse_flash_event_fn {
509 match parse_fn(dex_extended.dex.clone(), log) {
510 Ok(flash_event) => {
511 match self.process_pool_flash_event(&flash_event, &pool) {
512 Ok(flash) => flash_batch.push(flash),
513 Err(e) => tracing::error!("Failed to process flash event: {e}"),
514 }
515 }
516 Err(e) => tracing::error!("Failed to parse flash event: {e}"),
517 }
518 }
519 }
520 } else if let Some(init_sig_bytes) = &initialize_sig_bytes {
521 if event_sig_bytes == init_sig_bytes.as_slice() {
522 let initialize_event = dex_extended.parse_initialize_event(log)?;
523 self.cache
524 .update_pool_initialize_price_tick(&initialize_event)
525 .await?;
526 }
527 } else {
528 let event_signature = hex::encode(event_sig_bytes);
529 tracing::error!(
530 "Unexpected event signature: {} for log {:?}",
531 event_signature,
532 log
533 );
534 }
535
536 if !beyond_stale_data
538 && last_block_across_pool_events_table
539 .map_or(false, |table_max| block_number > table_max)
540 {
541 tracing::info!(
542 "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
543 block_number
544 );
545
546 self.flush_event_batches(
548 EVENT_BATCH_SIZE,
549 &mut swap_batch,
550 &mut liquidity_batch,
551 &mut collect_batch,
552 &mut flash_batch,
553 false,
554 true,
555 )
556 .await?;
557
558 beyond_stale_data = true;
559 tracing::info!("Switched to COPY mode - future batches will use COPY command");
560 } else {
561 self.flush_event_batches(
563 EVENT_BATCH_SIZE,
564 &mut swap_batch,
565 &mut liquidity_batch,
566 &mut collect_batch,
567 &mut flash_batch,
568 false, false,
570 )
571 .await?;
572 }
573
574 metrics.update(blocks_processed as usize);
575 blocks_processed = 0;
576
577 if metrics.should_log_progress(block_number, to_block) {
579 metrics.log_progress(block_number);
580 self.cache
581 .update_pool_last_synced_block(dex, &pool_address, block_number)
582 .await?;
583 }
584 }
585
586 self.flush_event_batches(
587 EVENT_BATCH_SIZE,
588 &mut swap_batch,
589 &mut liquidity_batch,
590 &mut collect_batch,
591 &mut flash_batch,
592 false,
593 true,
594 )
595 .await?;
596
597 metrics.log_final_stats();
598 self.cache
599 .update_pool_last_synced_block(dex, &pool_address, to_block)
600 .await?;
601
602 tracing::info!(
603 "Successfully synced Dex '{}' Pool '{}' up to block {}",
604 dex,
605 pool_display,
606 to_block
607 );
608 Ok(())
609 }
610
611 async fn flush_event_batches(
612 &mut self,
613 event_batch_size: usize,
614 swap_batch: &mut Vec<PoolSwap>,
615 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
616 collect_batch: &mut Vec<PoolFeeCollect>,
617 flash_batch: &mut Vec<PoolFlash>,
618 use_copy_command: bool,
619 force_flush_all: bool,
620 ) -> anyhow::Result<()> {
621 if force_flush_all || swap_batch.len() >= event_batch_size {
622 if !swap_batch.is_empty() {
623 self.cache
624 .add_pool_swaps_batch(swap_batch, use_copy_command)
625 .await?;
626 swap_batch.clear();
627 }
628 }
629 if force_flush_all || liquidity_batch.len() >= event_batch_size {
630 if !liquidity_batch.is_empty() {
631 self.cache
632 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
633 .await?;
634 liquidity_batch.clear();
635 }
636 }
637 if force_flush_all || collect_batch.len() >= event_batch_size {
638 if !collect_batch.is_empty() {
639 self.cache
640 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
641 .await?;
642 collect_batch.clear();
643 }
644 }
645 if force_flush_all || flash_batch.len() >= event_batch_size {
646 if !flash_batch.is_empty() {
647 self.cache.add_pool_flash_batch(flash_batch).await?;
648 flash_batch.clear();
649 }
650 }
651 Ok(())
652 }
653
654 pub fn process_pool_swap_event(
660 &self,
661 swap_event: &SwapEvent,
662 pool: &SharedPool,
663 dex_extended: &DexExtended,
664 ) -> anyhow::Result<PoolSwap> {
665 let timestamp = self
666 .cache
667 .get_block_timestamp(swap_event.block_number)
668 .copied();
669 let (side, size, price) =
670 dex_extended.convert_to_trade_data(&pool.token0, &pool.token1, swap_event)?;
671 let swap = swap_event.to_pool_swap(
672 self.chain.clone(),
673 pool.instrument_id,
674 pool.address,
675 Some(side),
676 Some(size),
677 Some(price),
678 timestamp,
679 );
680
681 Ok(swap)
685 }
686
687 pub fn process_pool_mint_event(
693 &self,
694 mint_event: &MintEvent,
695 pool: &SharedPool,
696 dex_extended: &DexExtended,
697 ) -> anyhow::Result<PoolLiquidityUpdate> {
698 let timestamp = self
699 .cache
700 .get_block_timestamp(mint_event.block_number)
701 .copied();
702
703 let liquidity_update = mint_event.to_pool_liquidity_update(
704 self.chain.clone(),
705 dex_extended.dex.clone(),
706 pool.instrument_id,
707 pool.address,
708 timestamp,
709 );
710
711 Ok(liquidity_update)
714 }
715
716 pub fn process_pool_burn_event(
723 &self,
724 burn_event: &BurnEvent,
725 pool: &SharedPool,
726 dex_extended: &DexExtended,
727 ) -> anyhow::Result<PoolLiquidityUpdate> {
728 let timestamp = self
729 .cache
730 .get_block_timestamp(burn_event.block_number)
731 .copied();
732
733 let liquidity_update = burn_event.to_pool_liquidity_update(
734 self.chain.clone(),
735 dex_extended.dex.clone(),
736 pool.instrument_id,
737 pool.address,
738 timestamp,
739 );
740
741 Ok(liquidity_update)
744 }
745
746 pub fn process_pool_collect_event(
752 &self,
753 collect_event: &CollectEvent,
754 pool: &SharedPool,
755 dex_extended: &DexExtended,
756 ) -> anyhow::Result<PoolFeeCollect> {
757 let timestamp = self
758 .cache
759 .get_block_timestamp(collect_event.block_number)
760 .copied();
761
762 let fee_collect = collect_event.to_pool_fee_collect(
763 self.chain.clone(),
764 dex_extended.dex.clone(),
765 pool.instrument_id,
766 pool.address,
767 timestamp,
768 );
769
770 Ok(fee_collect)
771 }
772
773 pub fn process_pool_flash_event(
779 &self,
780 flash_event: &FlashEvent,
781 pool: &SharedPool,
782 ) -> anyhow::Result<PoolFlash> {
783 let timestamp = self
784 .cache
785 .get_block_timestamp(flash_event.block_number)
786 .copied();
787
788 let flash = flash_event.to_pool_flash(
789 self.chain.clone(),
790 pool.instrument_id,
791 pool.address,
792 timestamp,
793 );
794
795 Ok(flash)
796 }
797
798 pub async fn sync_exchange_pools(
809 &mut self,
810 dex: &DexType,
811 from_block: u64,
812 to_block: Option<u64>,
813 reset: bool,
814 ) -> anyhow::Result<()> {
815 let (last_synced_block, effective_from_block) = if reset {
817 (None, from_block)
818 } else {
819 let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
820 let effective_from_block = last_synced_block
821 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
822 (last_synced_block, effective_from_block)
823 };
824
825 let to_block = match to_block {
826 Some(block) => block,
827 None => self.hypersync_client.current_block().await,
828 };
829
830 if effective_from_block > to_block {
832 tracing::info!(
833 "DEX {} already synced to block {} (current: {}), skipping sync",
834 dex,
835 last_synced_block.unwrap_or(0),
836 to_block
837 );
838 return Ok(());
839 }
840
841 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
842 tracing::info!(
843 "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
844 effective_from_block,
845 to_block,
846 total_blocks,
847 if let Some(last_synced) = last_synced_block {
848 format!(" - resuming from last synced block {last_synced}")
849 } else {
850 String::new()
851 }
852 );
853
854 let mut metrics = BlockchainSyncReporter::new(
855 BlockchainSyncReportItems::PoolCreatedEvents,
856 effective_from_block,
857 total_blocks,
858 BLOCKS_PROCESS_IN_SYNC_REPORT,
859 );
860
861 let dex = self.get_dex_extended(dex)?.clone();
862 let factory_address = &dex.factory;
863 let pair_created_event_signature = dex.pool_created_event.as_ref();
864 let pools_stream = self
865 .hypersync_client
866 .request_contract_events_stream(
867 effective_from_block,
868 Some(to_block),
869 factory_address,
870 vec![pair_created_event_signature],
871 )
872 .await;
873
874 tokio::pin!(pools_stream);
875
876 let token_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
878 const POOL_BATCH_SIZE: usize = 1000;
879 let mut token_buffer: HashSet<Address> = HashSet::new();
880 let mut pool_buffer: Vec<PoolCreatedEvent> = Vec::new();
881 let mut last_block_saved = effective_from_block;
882
883 while let Some(log) = pools_stream.next().await {
884 let block_number = extract_block_number(&log)?;
885 let blocks_progress = block_number - last_block_saved;
886 last_block_saved = block_number;
887
888 let pool = dex.parse_pool_created_event(log)?;
889 if self.cache.get_pool(&pool.pool_address).is_some() {
890 continue;
892 }
893
894 if self.cache.is_invalid_token(&pool.token0)
895 || self.cache.is_invalid_token(&pool.token1)
896 {
897 continue;
899 }
900
901 if self.cache.get_token(&pool.token0).is_none() {
902 token_buffer.insert(pool.token0);
903 }
904 if self.cache.get_token(&pool.token1).is_none() {
905 token_buffer.insert(pool.token1);
906 }
907 pool_buffer.push(pool);
909
910 if token_buffer.len() >= token_batch_size || pool_buffer.len() >= POOL_BATCH_SIZE {
911 self.flush_tokens_and_process_pools(
912 &mut token_buffer,
913 &mut pool_buffer,
914 dex.dex.clone(),
915 )
916 .await?;
917 }
918
919 metrics.update(blocks_progress as usize);
920 if metrics.should_log_progress(block_number, to_block) {
922 metrics.log_progress(block_number);
923 }
924 }
925
926 if !token_buffer.is_empty() || !pool_buffer.is_empty() {
927 self.flush_tokens_and_process_pools(
928 &mut token_buffer,
929 &mut pool_buffer,
930 dex.dex.clone(),
931 )
932 .await?;
933 }
934
935 metrics.log_final_stats();
936
937 self.cache
939 .update_dex_last_synced_block(&dex.dex.name, to_block)
940 .await?;
941
942 tracing::info!(
943 "Successfully synced DEX {} pools up to block {}",
944 dex.dex.name,
945 to_block
946 );
947
948 Ok(())
949 }
950
951 async fn flush_tokens_and_process_pools(
958 &mut self,
959 token_buffer: &mut HashSet<Address>,
960 pool_buffer: &mut Vec<PoolCreatedEvent>,
961 dex: SharedDex,
962 ) -> anyhow::Result<()> {
963 let batch_addresses: Vec<Address> = token_buffer.drain().collect();
964 let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
965
966 let mut empty_tokens = HashSet::new();
967 let mut decoding_errors_tokens = HashSet::new();
969
970 for (token_address, token_info) in token_infos {
971 match token_info {
972 Ok(token) => {
973 let token = Token::new(
974 self.chain.clone(),
975 token_address,
976 token.name,
977 token.symbol,
978 token.decimals,
979 );
980 self.cache.add_token(token).await?;
981 }
982 Err(token_info_error) => match token_info_error {
983 TokenInfoError::EmptyTokenField { .. } => {
984 empty_tokens.insert(token_address);
985 self.cache
986 .add_invalid_token(token_address, &token_info_error.to_string())
987 .await?;
988 }
989 TokenInfoError::DecodingError { .. } => {
990 decoding_errors_tokens.insert(token_address);
991 self.cache
992 .add_invalid_token(token_address, &token_info_error.to_string())
993 .await?;
994 }
995 TokenInfoError::CallFailed { .. } => {
996 decoding_errors_tokens.insert(token_address);
997 self.cache
998 .add_invalid_token(token_address, &token_info_error.to_string())
999 .await?;
1000 }
1001 _ => {
1002 tracing::error!(
1003 "Error fetching token info: {}",
1004 token_info_error.to_string()
1005 );
1006 }
1007 },
1008 }
1009 }
1010 let mut pools = Vec::new();
1011 for pool_event in &mut *pool_buffer {
1012 if empty_tokens.contains(&pool_event.token0)
1014 || empty_tokens.contains(&pool_event.token1)
1015 || decoding_errors_tokens.contains(&pool_event.token0)
1016 || decoding_errors_tokens.contains(&pool_event.token1)
1017 {
1018 continue;
1019 }
1020
1021 match self.construct_pool(dex.clone(), pool_event).await {
1022 Ok(pool) => pools.push(pool),
1023 Err(e) => tracing::error!(
1024 "Failed to process {} with error {}",
1025 pool_event.pool_address,
1026 e
1027 ),
1028 }
1029 }
1030
1031 self.cache.add_pools_batch(pools).await?;
1032 pool_buffer.clear();
1033
1034 Ok(())
1035 }
1036
1037 async fn construct_pool(
1046 &mut self,
1047 dex: SharedDex,
1048 event: &PoolCreatedEvent,
1049 ) -> anyhow::Result<Pool> {
1050 let token0 = match self.cache.get_token(&event.token0) {
1051 Some(token) => token.clone(),
1052 None => {
1053 anyhow::bail!("Token {} should be initialized in the cache", event.token0);
1054 }
1055 };
1056 let token1 = match self.cache.get_token(&event.token1) {
1057 Some(token) => token.clone(),
1058 None => {
1059 anyhow::bail!("Token {} should be initialized in the cache", event.token1);
1060 }
1061 };
1062
1063 Ok(Pool::new(
1064 self.chain.clone(),
1065 dex,
1066 event.pool_address,
1067 event.block_number,
1068 token0,
1069 token1,
1070 event.fee,
1071 event.tick_spacing,
1072 UnixNanos::default(), ))
1074 }
1075
1076 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1087 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1088 tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1089
1090 self.cache.add_dex(dex_extended.dex.clone()).await?;
1091 let _ = self.cache.load_pools(&dex_id).await?;
1092
1093 self.subscription_manager.register_dex_for_subscriptions(
1094 dex_id,
1095 dex_extended.swap_created_event.as_ref(),
1096 dex_extended.mint_created_event.as_ref(),
1097 dex_extended.burn_created_event.as_ref(),
1098 dex_extended.collect_created_event.as_ref(),
1099 dex_extended.flash_created_event.as_deref(),
1100 );
1101 Ok(())
1102 } else {
1103 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1104 }
1105 }
1106
1107 pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1116 if let Some(database) = &self.cache.database {
1117 tracing::info!(
1118 "Replaying historical events for pool {} to hydrate profiler",
1119 pool.instrument_id
1120 );
1121
1122 let mut event_stream = database.stream_pool_events(
1123 self.chain.clone(),
1124 dex.clone(),
1125 pool.instrument_id,
1126 &pool.address,
1127 None,
1128 );
1129 let mut event_count = 0;
1130
1131 while let Some(event_result) = event_stream.next().await {
1132 match event_result {
1133 Ok(event) => {
1134 let data_event = match event {
1135 DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1136 DexPoolData::LiquidityUpdate(update) => {
1137 DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1138 }
1139 DexPoolData::FeeCollect(collect) => {
1140 DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1141 }
1142 DexPoolData::Flash(flash) => {
1143 DataEvent::DeFi(DefiData::PoolFlash(flash))
1144 }
1145 };
1146 self.send_data(data_event);
1147 event_count += 1;
1148 }
1149 Err(e) => {
1150 tracing::error!(
1151 "Error streaming event for pool {}: {e}",
1152 pool.instrument_id
1153 );
1154 }
1155 }
1156 }
1157
1158 tracing::info!(
1159 "Replayed {event_count} historical events for pool {}",
1160 pool.instrument_id
1161 );
1162 } else {
1163 tracing::debug!(
1164 "No database available, skipping event replay for pool {}",
1165 pool.instrument_id
1166 );
1167 }
1168
1169 Ok(())
1170 }
1171
1172 fn determine_from_block(&self) -> u64 {
1174 self.config
1175 .from_block
1176 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1177 }
1178
1179 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1181 if !self.cache.get_registered_dexes().contains(dex_id) {
1182 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1183 }
1184
1185 match get_dex_extended(self.chain.name, dex_id) {
1186 Some(dex) => Ok(dex),
1187 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1188 }
1189 }
1190
1191 pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1197 match self.cache.get_pool(pool_address) {
1198 Some(pool) => Ok(pool),
1199 None => anyhow::bail!("Pool {pool_address} is not registered"),
1200 }
1201 }
1202
1203 pub fn send_data(&self, data: DataEvent) {
1205 if let Some(data_tx) = &self.data_tx {
1206 tracing::debug!("Sending {data}");
1207
1208 if let Err(e) = data_tx.send(data) {
1209 tracing::error!("Failed to send data: {e}");
1210 }
1211 } else {
1212 tracing::error!("No data event channel for sending data");
1213 }
1214 }
1215
1216 pub fn disconnect(&mut self) {
1221 self.hypersync_client.disconnect();
1222 }
1223}