1use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::{Address, U256};
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23 Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
24 SharedPool, Token, data::PoolFeeCollect,
25};
26
27use crate::{
28 cache::BlockchainCache,
29 config::BlockchainDataClientConfig,
30 contracts::erc20::{Erc20Contract, TokenInfoError},
31 data::subscription::DefiDataSubscriptionManager,
32 decode::u256_to_quantity,
33 events::{
34 burn::BurnEvent, collect::CollectEvent, mint::MintEvent, pool_created::PoolCreatedEvent,
35 swap::SwapEvent,
36 },
37 exchanges::{extended::DexExtended, get_dex_extended},
38 hypersync::{
39 client::HyperSyncClient,
40 helpers::{extract_block_number, extract_event_signature_bytes},
41 },
42 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
43 rpc::{
44 BlockchainRpcClient, BlockchainRpcClientAny,
45 chains::{
46 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47 polygon::PolygonRpcClient,
48 },
49 http::BlockchainHttpRpcClient,
50 types::BlockchainMessage,
51 },
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50000;
55
56#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62 pub chain: SharedChain,
64 pub config: BlockchainDataClientConfig,
66 pub cache: BlockchainCache,
68 tokens: Erc20Contract,
70 pub hypersync_client: HyperSyncClient,
72 pub rpc_client: Option<BlockchainRpcClientAny>,
74 pub subscription_manager: DefiDataSubscriptionManager,
76 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
78}
79
80impl BlockchainDataClientCore {
81 #[must_use]
87 pub fn new(
88 config: BlockchainDataClientConfig,
89 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
90 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
91 ) -> Self {
92 let chain = config.chain.clone();
93 let cache = BlockchainCache::new(chain.clone());
94 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
95 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
96 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
97 } else {
98 None
99 };
100 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101 config.http_rpc_url.clone(),
102 config.rpc_requests_per_second,
103 ));
104 let erc20_contract = Erc20Contract::new(
105 http_rpc_client,
106 config.pool_filters.remove_pools_with_empty_erc20fields,
107 );
108
109 let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
110 Self {
111 chain,
112 config,
113 rpc_client,
114 tokens: erc20_contract,
115 cache,
116 hypersync_client,
117 subscription_manager: DefiDataSubscriptionManager::new(),
118 data_tx,
119 }
120 }
121
122 pub async fn initialize_cache_database(&mut self) {
124 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
125 tracing::info!(
126 "Initializing blockchain cache on database '{}'",
127 pg_connect_options.database
128 );
129 self.cache
130 .initialize_database(pg_connect_options.clone().into())
131 .await;
132 }
133 }
134
135 fn initialize_rpc_client(
137 blockchain: Blockchain,
138 wss_rpc_url: String,
139 ) -> BlockchainRpcClientAny {
140 match blockchain {
141 Blockchain::Ethereum => {
142 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
143 }
144 Blockchain::Polygon => {
145 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
146 }
147 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
148 Blockchain::Arbitrum => {
149 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
150 }
151 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
152 }
153 }
154
155 pub async fn connect(&mut self) -> anyhow::Result<()> {
161 tracing::info!(
162 "Connecting blockchain data client for '{}'",
163 self.chain.name
164 );
165 self.initialize_cache_database().await;
166
167 if let Some(ref mut rpc_client) = self.rpc_client {
168 rpc_client.connect().await?;
169 }
170
171 let from_block = self.determine_from_block();
172
173 tracing::info!(
174 "Connecting to blockchain data source for '{chain_name}' from block {from_block}",
175 chain_name = self.chain.name
176 );
177
178 self.cache.initialize_chain().await;
180 self.cache.connect(from_block).await?;
182 for dex in self.config.dex_ids.clone() {
186 self.register_dex_exchange(dex).await?;
187 self.sync_exchange_pools(&dex, from_block, None, false)
188 .await?;
189 }
190
191 Ok(())
192 }
193
194 pub async fn sync_blocks_checked(
200 &mut self,
201 from_block: u64,
202 to_block: Option<u64>,
203 ) -> anyhow::Result<()> {
204 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
205 if blocks_status.is_consistent() {
207 tracing::info!(
208 "Cache is consistent: no gaps detected (last continuous block: {})",
209 blocks_status.last_continuous_block
210 );
211 let target_block = max(blocks_status.max_block + 1, from_block);
212 tracing::info!("Starting fast sync with COPY from block {}", target_block);
213 self.sync_blocks(target_block, to_block, true).await?;
214 } else {
215 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
216 tracing::info!(
217 "Cache inconsistency detected: {} blocks missing between {} and {}",
218 gap_size,
219 blocks_status.last_continuous_block + 1,
220 blocks_status.max_block
221 );
222
223 tracing::info!(
224 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
225 blocks_status.last_continuous_block + 1,
226 blocks_status.max_block
227 );
228 self.sync_blocks(
229 blocks_status.last_continuous_block + 1,
230 Some(blocks_status.max_block),
231 false,
232 )
233 .await?;
234
235 tracing::info!(
236 "Block syncing Phase 2: Continuing with fast COPY from block {}",
237 blocks_status.max_block + 1
238 );
239 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
240 .await?;
241 }
242 } else {
243 self.sync_blocks(from_block, to_block, true).await?;
244 }
245
246 Ok(())
247 }
248
249 pub async fn sync_blocks(
255 &mut self,
256 from_block: u64,
257 to_block: Option<u64>,
258 use_copy_command: bool,
259 ) -> anyhow::Result<()> {
260 let to_block = if let Some(block) = to_block {
261 block
262 } else {
263 self.hypersync_client.current_block().await
264 };
265 let total_blocks = to_block.saturating_sub(from_block) + 1;
266 tracing::info!(
267 "Syncing blocks from {from_block} to {to_block} (total: {total_blocks} blocks)"
268 );
269
270 if let Err(e) = self.cache.toggle_performance_settings(true).await {
272 tracing::warn!("Failed to enable performance settings: {e}");
273 }
274
275 let blocks_stream = self
276 .hypersync_client
277 .request_blocks_stream(from_block, Some(to_block))
278 .await;
279
280 tokio::pin!(blocks_stream);
281
282 let mut metrics = BlockchainSyncReporter::new(
283 BlockchainSyncReportItems::Blocks,
284 from_block,
285 total_blocks,
286 BLOCKS_PROCESS_IN_SYNC_REPORT,
287 );
288
289 const BATCH_SIZE: usize = 1000;
291 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
292
293 while let Some(block) = blocks_stream.next().await {
294 let block_number = block.number;
295 if self.cache.get_block_timestamp(block_number).is_some() {
296 continue;
297 }
298 batch.push(block);
299
300 if batch.len() >= BATCH_SIZE || block_number >= to_block {
302 let batch_size = batch.len();
303
304 self.cache.add_blocks_batch(batch, use_copy_command).await?;
305 metrics.update(batch_size);
306
307 batch = Vec::with_capacity(BATCH_SIZE);
309 }
310
311 if metrics.should_log_progress(block_number, to_block) {
313 metrics.log_progress(block_number);
314 }
315 }
316
317 if !batch.is_empty() {
319 let batch_size = batch.len();
320 self.cache.add_blocks_batch(batch, use_copy_command).await?;
321 metrics.update(batch_size);
322 }
323
324 metrics.log_final_stats();
325
326 if let Err(e) = self.cache.toggle_performance_settings(false).await {
328 tracing::warn!("Failed to restore default settings: {e}");
329 }
330
331 Ok(())
332 }
333
334 pub async fn sync_pool_events(
340 &mut self,
341 dex: &DexType,
342 pool_address: Address,
343 from_block: Option<u64>,
344 to_block: Option<u64>,
345 reset: bool,
346 ) -> anyhow::Result<()> {
347 let pool: SharedPool = self.get_pool(&pool_address)?.clone();
348 let pool_display = pool.to_full_spec_string();
349 let from_block = from_block.unwrap_or(pool.creation_block);
350
351 let (last_synced_block, effective_from_block) = if reset {
352 (None, from_block)
353 } else {
354 let last_synced_block = self
355 .cache
356 .get_pool_last_synced_block(dex, &pool_address)
357 .await?;
358 let effective_from_block = last_synced_block
359 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
360 (last_synced_block, effective_from_block)
361 };
362
363 let to_block = match to_block {
364 Some(block) => block,
365 None => self.hypersync_client.current_block().await,
366 };
367
368 if effective_from_block > to_block {
370 tracing::info!(
371 "D {} already synced to block {} (current: {}), skipping sync",
372 dex,
373 last_synced_block.unwrap_or(0),
374 to_block
375 );
376 return Ok(());
377 }
378
379 let last_block_across_pool_events_table = self
381 .cache
382 .get_pool_event_tables_last_block(&pool_address)
383 .await?;
384
385 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
386 tracing::info!(
387 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
388 pool_display,
389 effective_from_block,
390 to_block,
391 total_blocks,
392 if let Some(last_synced) = last_synced_block {
393 format!(" - resuming from last synced block {}", last_synced)
394 } else {
395 String::new()
396 }
397 );
398
399 let mut metrics = BlockchainSyncReporter::new(
400 BlockchainSyncReportItems::PoolEvents,
401 effective_from_block,
402 total_blocks,
403 BLOCKS_PROCESS_IN_SYNC_REPORT,
404 );
405 let dex_extended = self.get_dex_extended(dex)?.clone();
406 let swap_event_signature = dex_extended.swap_created_event.as_ref();
407 let mint_event_signature = dex_extended.mint_created_event.as_ref();
408 let burn_event_signature = dex_extended.burn_created_event.as_ref();
409 let collect_event_signature = dex_extended.collect_created_event.as_ref();
410 let initialize_event_signature: Option<&str> =
411 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
412
413 let swap_sig_bytes = hex::decode(
415 swap_event_signature
416 .strip_prefix("0x")
417 .unwrap_or(swap_event_signature),
418 )?;
419 let mint_sig_bytes = hex::decode(
420 mint_event_signature
421 .strip_prefix("0x")
422 .unwrap_or(mint_event_signature),
423 )?;
424 let burn_sig_bytes = hex::decode(
425 burn_event_signature
426 .strip_prefix("0x")
427 .unwrap_or(burn_event_signature),
428 )?;
429 let collect_sig_bytes = hex::decode(
430 collect_event_signature
431 .strip_prefix("0x")
432 .unwrap_or(collect_event_signature),
433 )?;
434 let initialize_sig_bytes = initialize_event_signature
435 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
436
437 let mut event_signatures = vec![
438 swap_event_signature,
439 mint_event_signature,
440 burn_event_signature,
441 collect_event_signature,
442 ];
443 if let Some(event) = dex_extended.initialize_event.as_ref() {
444 event_signatures.push(event);
445 }
446 let pool_events_stream = self
447 .hypersync_client
448 .request_contract_events_stream(
449 effective_from_block,
450 Some(to_block),
451 &pool_address,
452 event_signatures,
453 )
454 .await;
455 tokio::pin!(pool_events_stream);
456
457 let mut last_block_saved = effective_from_block;
458 let mut blocks_processed = 0;
459
460 const EVENT_BATCH_SIZE: usize = 20000;
462 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
463 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
464 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
465
466 let mut beyond_stale_data = last_block_across_pool_events_table
468 .map_or(true, |tables_max| effective_from_block > tables_max);
469 while let Some(log) = pool_events_stream.next().await {
470 let block_number = extract_block_number(&log)?;
471 blocks_processed += block_number - last_block_saved;
472 last_block_saved = block_number;
473
474 let event_sig_bytes = extract_event_signature_bytes(&log)?;
475 if event_sig_bytes == swap_sig_bytes.as_slice() {
476 let swap_event = dex_extended.parse_swap_event(log)?;
477 match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
478 Ok(swap) => swap_batch.push(swap),
479 Err(e) => tracing::error!("Failed to process swap event: {e}"),
480 }
481 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
482 let mint_event = dex_extended.parse_mint_event(log)?;
483 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
484 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
485 Err(e) => tracing::error!("Failed to process mint event: {e}"),
486 }
487 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
488 let burn_event = dex_extended.parse_burn_event(log)?;
489 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
490 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
491 Err(e) => tracing::error!("Failed to process burn event: {e}"),
492 }
493 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
494 let collect_event = dex_extended.parse_collect_event(log)?;
495 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
496 Ok(fee_collect) => collect_batch.push(fee_collect),
497 Err(e) => tracing::error!("Failed to process collect event: {e}"),
498 }
499 } else if let Some(init_sig_bytes) = &initialize_sig_bytes {
500 if event_sig_bytes == init_sig_bytes.as_slice() {
501 let initialize_event = dex_extended.parse_initialize_event(log)?;
502 self.cache
503 .update_pool_initialize_price_tick(&initialize_event)
504 .await?;
505 }
506 } else {
507 let event_signature = hex::encode(event_sig_bytes);
508 tracing::error!(
509 "Unexpected event signature: {} for log {:?}",
510 event_signature,
511 log
512 );
513 }
514
515 if !beyond_stale_data
517 && last_block_across_pool_events_table
518 .map_or(false, |table_max| block_number > table_max)
519 {
520 tracing::info!(
521 "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
522 block_number
523 );
524
525 self.flush_event_batches(
527 EVENT_BATCH_SIZE,
528 &mut swap_batch,
529 &mut liquidity_batch,
530 &mut collect_batch,
531 false,
532 true,
533 )
534 .await?;
535
536 beyond_stale_data = true;
537 tracing::info!("Switched to COPY mode - future batches will use COPY command");
538 } else {
539 self.flush_event_batches(
541 EVENT_BATCH_SIZE,
542 &mut swap_batch,
543 &mut liquidity_batch,
544 &mut collect_batch,
545 beyond_stale_data,
546 false,
547 )
548 .await?;
549 }
550
551 metrics.update(blocks_processed as usize);
552 blocks_processed = 0;
553
554 if metrics.should_log_progress(block_number, to_block) {
556 metrics.log_progress(block_number);
557 self.cache
558 .update_pool_last_synced_block(dex, &pool_address, block_number)
559 .await?;
560 }
561 }
562
563 self.flush_event_batches(
564 EVENT_BATCH_SIZE,
565 &mut swap_batch,
566 &mut liquidity_batch,
567 &mut collect_batch,
568 beyond_stale_data,
569 true,
570 )
571 .await?;
572
573 metrics.log_final_stats();
574 self.cache
575 .update_pool_last_synced_block(dex, &pool_address, to_block)
576 .await?;
577
578 tracing::info!(
579 "Successfully synced Dex '{}' Pool '{}' up to block {}",
580 dex,
581 pool_display,
582 to_block
583 );
584 Ok(())
585 }
586
587 async fn flush_event_batches(
588 &mut self,
589 event_batch_size: usize,
590 swap_batch: &mut Vec<PoolSwap>,
591 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
592 collect_batch: &mut Vec<PoolFeeCollect>,
593 use_copy_command: bool,
594 force_flush_all: bool,
595 ) -> anyhow::Result<()> {
596 if force_flush_all || swap_batch.len() >= event_batch_size {
597 if !swap_batch.is_empty() {
598 self.cache
599 .add_pool_swaps_batch(swap_batch, use_copy_command)
600 .await?;
601 swap_batch.clear();
602 }
603 }
604 if force_flush_all || liquidity_batch.len() >= event_batch_size {
605 if !liquidity_batch.is_empty() {
606 self.cache
607 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
608 .await?;
609 liquidity_batch.clear();
610 }
611 }
612 if force_flush_all || collect_batch.len() >= event_batch_size {
613 if !collect_batch.is_empty() {
614 self.cache
615 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
616 .await?;
617 collect_batch.clear();
618 }
619 }
620 Ok(())
621 }
622
623 pub fn process_pool_swap_event(
629 &self,
630 swap_event: &SwapEvent,
631 pool: &SharedPool,
632 dex_extended: &DexExtended,
633 ) -> anyhow::Result<PoolSwap> {
634 let timestamp = self
635 .cache
636 .get_block_timestamp(swap_event.block_number)
637 .copied();
638
639 let (side, size, price) = dex_extended
640 .convert_to_trade_data(&pool.token0, &pool.token1, swap_event)
641 .expect("Failed to convert swap event to trade data");
642 let swap = swap_event.to_pool_swap(
643 self.chain.clone(),
644 pool.instrument_id,
645 pool.address,
646 side,
647 size,
648 price,
649 timestamp,
650 );
651
652 Ok(swap)
656 }
657
658 pub fn process_pool_mint_event(
664 &self,
665 mint_event: &MintEvent,
666 pool: &SharedPool,
667 dex_extended: &DexExtended,
668 ) -> anyhow::Result<PoolLiquidityUpdate> {
669 let timestamp = self
670 .cache
671 .get_block_timestamp(mint_event.block_number)
672 .copied();
673 let liquidity = u256_to_quantity(
674 U256::from(mint_event.amount),
675 self.chain.native_currency_decimals,
676 )?;
677 let amount0 = u256_to_quantity(mint_event.amount0, pool.token0.decimals)?;
678 let amount1 = u256_to_quantity(mint_event.amount1, pool.token1.decimals)?;
679
680 let liquidity_update = mint_event.to_pool_liquidity_update(
681 self.chain.clone(),
682 dex_extended.dex.clone(),
683 pool.instrument_id,
684 pool.address,
685 liquidity,
686 amount0,
687 amount1,
688 timestamp,
689 );
690
691 Ok(liquidity_update)
694 }
695
696 pub fn process_pool_burn_event(
703 &self,
704 burn_event: &BurnEvent,
705 pool: &SharedPool,
706 dex_extended: &DexExtended,
707 ) -> anyhow::Result<PoolLiquidityUpdate> {
708 let timestamp = self
709 .cache
710 .get_block_timestamp(burn_event.block_number)
711 .copied();
712 let liquidity = u256_to_quantity(
713 U256::from(burn_event.amount),
714 self.chain.native_currency_decimals,
715 )?;
716 let amount0 = u256_to_quantity(burn_event.amount0, pool.token0.decimals)?;
717 let amount1 = u256_to_quantity(burn_event.amount1, pool.token1.decimals)?;
718
719 let liquidity_update = burn_event.to_pool_liquidity_update(
720 self.chain.clone(),
721 dex_extended.dex.clone(),
722 pool.instrument_id,
723 pool.address,
724 liquidity,
725 amount0,
726 amount1,
727 timestamp,
728 );
729
730 Ok(liquidity_update)
733 }
734
735 pub fn process_pool_collect_event(
741 &self,
742 collect_event: &CollectEvent,
743 pool: &SharedPool,
744 dex_extended: &DexExtended,
745 ) -> anyhow::Result<PoolFeeCollect> {
746 let timestamp = self
747 .cache
748 .get_block_timestamp(collect_event.block_number)
749 .copied();
750 let fee0 = u256_to_quantity(collect_event.amount0, pool.token0.decimals)?;
751 let fee1 = u256_to_quantity(collect_event.amount1, pool.token1.decimals)?;
752
753 let fee_collect = collect_event.to_pool_fee_collect(
754 self.chain.clone(),
755 dex_extended.dex.clone(),
756 pool.instrument_id,
757 pool.address,
758 fee0,
759 fee1,
760 timestamp,
761 );
762
763 Ok(fee_collect)
764 }
765
766 pub async fn sync_exchange_pools(
777 &mut self,
778 dex: &DexType,
779 from_block: u64,
780 to_block: Option<u64>,
781 reset: bool,
782 ) -> anyhow::Result<()> {
783 let (last_synced_block, effective_from_block) = if reset {
785 (None, from_block)
786 } else {
787 let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
788 let effective_from_block = last_synced_block
789 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
790 (last_synced_block, effective_from_block)
791 };
792
793 let to_block = match to_block {
794 Some(block) => block,
795 None => self.hypersync_client.current_block().await,
796 };
797
798 if effective_from_block > to_block {
800 tracing::info!(
801 "DEX {} already synced to block {} (current: {}), skipping sync",
802 dex,
803 last_synced_block.unwrap_or(0),
804 to_block
805 );
806 return Ok(());
807 }
808
809 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
810 tracing::info!(
811 "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
812 effective_from_block,
813 to_block,
814 total_blocks,
815 if let Some(last_synced) = last_synced_block {
816 format!(" - resuming from last synced block {last_synced}")
817 } else {
818 String::new()
819 }
820 );
821
822 let mut metrics = BlockchainSyncReporter::new(
823 BlockchainSyncReportItems::PoolCreatedEvents,
824 effective_from_block,
825 total_blocks,
826 BLOCKS_PROCESS_IN_SYNC_REPORT,
827 );
828
829 let dex = self.get_dex_extended(dex)?.clone();
830 let factory_address = &dex.factory;
831 let pair_created_event_signature = dex.pool_created_event.as_ref();
832 let pools_stream = self
833 .hypersync_client
834 .request_contract_events_stream(
835 effective_from_block,
836 Some(to_block),
837 factory_address,
838 vec![pair_created_event_signature],
839 )
840 .await;
841
842 tokio::pin!(pools_stream);
843
844 let token_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
846 const POOL_BATCH_SIZE: usize = 1000;
847 let mut token_buffer: HashSet<Address> = HashSet::new();
848 let mut pool_buffer: Vec<PoolCreatedEvent> = Vec::new();
849 let mut last_block_saved = effective_from_block;
850
851 while let Some(log) = pools_stream.next().await {
852 let block_number = extract_block_number(&log)?;
853 let blocks_progress = block_number - last_block_saved;
854 last_block_saved = block_number;
855
856 let pool = dex.parse_pool_created_event(log)?;
857 if self.cache.get_pool(&pool.pool_address).is_some() {
858 continue;
860 }
861
862 if self.cache.is_invalid_token(&pool.token0)
863 || self.cache.is_invalid_token(&pool.token1)
864 {
865 continue;
867 }
868
869 if self.cache.get_token(&pool.token0).is_none() {
870 token_buffer.insert(pool.token0);
871 }
872 if self.cache.get_token(&pool.token1).is_none() {
873 token_buffer.insert(pool.token1);
874 }
875 pool_buffer.push(pool);
877
878 if token_buffer.len() >= token_batch_size || pool_buffer.len() >= POOL_BATCH_SIZE {
879 self.flush_tokens_and_process_pools(
880 &mut token_buffer,
881 &mut pool_buffer,
882 dex.dex.clone(),
883 )
884 .await?;
885 }
886
887 metrics.update(blocks_progress as usize);
888 if metrics.should_log_progress(block_number, to_block) {
890 metrics.log_progress(block_number);
891 }
892 }
893
894 if !token_buffer.is_empty() || !pool_buffer.is_empty() {
895 self.flush_tokens_and_process_pools(
896 &mut token_buffer,
897 &mut pool_buffer,
898 dex.dex.clone(),
899 )
900 .await?;
901 }
902
903 metrics.log_final_stats();
904
905 self.cache
907 .update_dex_last_synced_block(&dex.dex.name, to_block)
908 .await?;
909
910 tracing::info!(
911 "Successfully synced DEX {} pools up to block {}",
912 dex.dex.name,
913 to_block
914 );
915
916 Ok(())
917 }
918
919 async fn flush_tokens_and_process_pools(
926 &mut self,
927 token_buffer: &mut HashSet<Address>,
928 pool_buffer: &mut Vec<PoolCreatedEvent>,
929 dex: SharedDex,
930 ) -> anyhow::Result<()> {
931 let batch_addresses: Vec<Address> = token_buffer.drain().collect();
932 let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
933
934 let mut empty_tokens = HashSet::new();
935 let mut decoding_errors_tokens = HashSet::new();
937
938 for (token_address, token_info) in token_infos {
939 match token_info {
940 Ok(token) => {
941 let token = Token::new(
942 self.chain.clone(),
943 token_address,
944 token.name,
945 token.symbol,
946 token.decimals,
947 );
948 self.cache.add_token(token).await?;
949 }
950 Err(token_info_error) => match token_info_error {
951 TokenInfoError::EmptyTokenField { .. } => {
952 empty_tokens.insert(token_address);
953 self.cache
954 .add_invalid_token(token_address, &token_info_error.to_string())
955 .await?;
956 }
957 TokenInfoError::DecodingError { .. } => {
958 decoding_errors_tokens.insert(token_address);
959 self.cache
960 .add_invalid_token(token_address, &token_info_error.to_string())
961 .await?;
962 }
963 TokenInfoError::CallFailed { .. } => {
964 decoding_errors_tokens.insert(token_address);
965 self.cache
966 .add_invalid_token(token_address, &token_info_error.to_string())
967 .await?;
968 }
969 _ => {
970 tracing::error!(
971 "Error fetching token info: {}",
972 token_info_error.to_string()
973 );
974 }
975 },
976 }
977 }
978 let mut pools = Vec::new();
979 for pool_event in &mut *pool_buffer {
980 if empty_tokens.contains(&pool_event.token0)
982 || empty_tokens.contains(&pool_event.token1)
983 || decoding_errors_tokens.contains(&pool_event.token0)
984 || decoding_errors_tokens.contains(&pool_event.token1)
985 {
986 continue;
987 }
988
989 match self.construct_pool(dex.clone(), pool_event).await {
990 Ok(pool) => pools.push(pool),
991 Err(e) => tracing::error!(
992 "Failed to process {} with error {}",
993 pool_event.pool_address,
994 e
995 ),
996 }
997 }
998
999 self.cache.add_pools_batch(pools).await?;
1000 pool_buffer.clear();
1001 Ok(())
1002 }
1003
1004 async fn construct_pool(
1013 &mut self,
1014 dex: SharedDex,
1015 event: &PoolCreatedEvent,
1016 ) -> anyhow::Result<Pool> {
1017 let token0 = match self.cache.get_token(&event.token0) {
1018 Some(token) => token.clone(),
1019 None => {
1020 anyhow::bail!("Token {} should be initialized in the cache", event.token0);
1021 }
1022 };
1023 let token1 = match self.cache.get_token(&event.token1) {
1024 Some(token) => token.clone(),
1025 None => {
1026 anyhow::bail!("Token {} should be initialized in the cache", event.token1);
1027 }
1028 };
1029
1030 Ok(Pool::new(
1031 self.chain.clone(),
1032 dex,
1033 event.pool_address,
1034 event.block_number,
1035 token0,
1036 token1,
1037 event.fee,
1038 event.tick_spacing,
1039 UnixNanos::default(), ))
1041 }
1042
1043 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1054 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1055 tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1056
1057 self.cache.add_dex(dex_extended.dex.clone()).await?;
1058 self.cache.load_pools(&dex_id).await?;
1059
1060 self.subscription_manager.register_dex_for_subscriptions(
1061 dex_id,
1062 dex_extended.swap_created_event.as_ref(),
1063 dex_extended.mint_created_event.as_ref(),
1064 dex_extended.burn_created_event.as_ref(),
1065 );
1066 Ok(())
1067 } else {
1068 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1069 }
1070 }
1071
1072 fn determine_from_block(&self) -> u64 {
1074 self.config
1075 .from_block
1076 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1077 }
1078
1079 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1081 if !self.cache.get_registered_dexes().contains(dex_id) {
1082 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1083 }
1084
1085 match get_dex_extended(self.chain.name, dex_id) {
1086 Some(dex) => Ok(dex),
1087 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1088 }
1089 }
1090
1091 pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1097 match self.cache.get_pool(pool_address) {
1098 Some(pool) => Ok(pool),
1099 None => anyhow::bail!("Pool {pool_address} is not registered"),
1100 }
1101 }
1102
1103 pub fn send_data(&self, data: DataEvent) {
1105 if let Some(data_tx) = &self.data_tx {
1106 tracing::debug!("Sending {data}");
1107
1108 if let Err(e) = data_tx.send(data) {
1109 tracing::error!("Failed to send data: {e}");
1110 }
1111 } else {
1112 tracing::error!("No data event channel for sending data");
1113 }
1114 }
1115
1116 pub fn disconnect(&mut self) {
1121 self.hypersync_client.disconnect();
1122 }
1123}