1use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_model::defi::{
21 Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
22 SharedChain, SharedDex, SharedPool,
23 data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24 pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
25 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
26};
27use thousands::Separable;
28
29use crate::{
30 cache::BlockchainCache,
31 config::BlockchainDataClientConfig,
32 contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
33 data::subscription::DefiDataSubscriptionManager,
34 events::{
35 burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
36 },
37 exchanges::{extended::DexExtended, get_dex_extended},
38 hypersync::{
39 client::HyperSyncClient,
40 helpers::{extract_block_number, extract_event_signature_bytes},
41 },
42 rpc::{
43 BlockchainRpcClient, BlockchainRpcClientAny,
44 chains::{
45 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
46 polygon::PolygonRpcClient,
47 },
48 http::BlockchainHttpRpcClient,
49 types::BlockchainMessage,
50 },
51 services::PoolDiscoveryService,
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62 pub chain: SharedChain,
64 pub config: BlockchainDataClientConfig,
66 pub cache: BlockchainCache,
68 tokens: Erc20Contract,
70 univ3_pool: UniswapV3PoolContract,
72 pub hypersync_client: HyperSyncClient,
74 pub rpc_client: Option<BlockchainRpcClientAny>,
76 pub subscription_manager: DefiDataSubscriptionManager,
78 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
80 cancellation_token: tokio_util::sync::CancellationToken,
82}
83
84impl BlockchainDataClientCore {
85 #[must_use]
91 pub fn new(
92 config: BlockchainDataClientConfig,
93 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
94 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
95 cancellation_token: tokio_util::sync::CancellationToken,
96 ) -> Self {
97 let chain = config.chain.clone();
98 let cache = BlockchainCache::new(chain.clone());
99
100 tracing::info!(
102 "Initializing blockchain data client for '{}' with HTTP RPC: {}",
103 chain.name,
104 config.http_rpc_url
105 );
106
107 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109 tracing::info!("WebSocket RPC URL: {}", wss_rpc_url);
110 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
111 } else {
112 tracing::info!("Using HyperSync for live data (no WebSocket RPC)");
113 None
114 };
115 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116 config.http_rpc_url.clone(),
117 config.rpc_requests_per_second,
118 ));
119 let erc20_contract = Erc20Contract::new(
120 http_rpc_client.clone(),
121 config.pool_filters.remove_pools_with_empty_erc20fields,
122 );
123
124 let hypersync_client =
125 HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
126 Self {
127 chain,
128 config,
129 rpc_client,
130 tokens: erc20_contract,
131 univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
132 cache,
133 hypersync_client,
134 subscription_manager: DefiDataSubscriptionManager::new(),
135 data_tx,
136 cancellation_token,
137 }
138 }
139
140 pub async fn initialize_cache_database(&mut self) {
142 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
143 tracing::info!(
144 "Initializing blockchain cache on database '{}'",
145 pg_connect_options.database
146 );
147 self.cache
148 .initialize_database(pg_connect_options.clone().into())
149 .await;
150 }
151 }
152
153 fn initialize_rpc_client(
155 blockchain: Blockchain,
156 wss_rpc_url: String,
157 ) -> BlockchainRpcClientAny {
158 match blockchain {
159 Blockchain::Ethereum => {
160 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
161 }
162 Blockchain::Polygon => {
163 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
164 }
165 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
166 Blockchain::Arbitrum => {
167 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
168 }
169 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
170 }
171 }
172
173 pub async fn connect(&mut self) -> anyhow::Result<()> {
179 tracing::info!(
180 "Connecting blockchain data client for '{}'",
181 self.chain.name
182 );
183 self.initialize_cache_database().await;
184
185 if let Some(ref mut rpc_client) = self.rpc_client {
186 rpc_client.connect().await?;
187 }
188
189 let from_block = self.determine_from_block();
190
191 tracing::info!(
192 "Connecting to blockchain data source for '{}' from block {}",
193 self.chain.name,
194 from_block.separate_with_commas()
195 );
196
197 self.cache.initialize_chain().await;
199 self.cache.connect(from_block).await?;
201 for dex in self.config.dex_ids.clone() {
205 self.register_dex_exchange(dex).await?;
206 self.sync_exchange_pools(&dex, from_block, None, false)
207 .await?;
208 }
209
210 Ok(())
211 }
212
213 pub async fn sync_blocks_checked(
219 &mut self,
220 from_block: u64,
221 to_block: Option<u64>,
222 ) -> anyhow::Result<()> {
223 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
224 if blocks_status.is_consistent() {
226 tracing::info!(
227 "Cache is consistent: no gaps detected (last continuous block: {})",
228 blocks_status.last_continuous_block
229 );
230 let target_block = max(blocks_status.max_block + 1, from_block);
231 tracing::info!(
232 "Starting fast sync with COPY from block {}",
233 target_block.separate_with_commas()
234 );
235 self.sync_blocks(target_block, to_block, true).await?;
236 } else {
237 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
238 tracing::info!(
239 "Cache inconsistency detected: {} blocks missing between {} and {}",
240 gap_size,
241 blocks_status.last_continuous_block + 1,
242 blocks_status.max_block
243 );
244
245 tracing::info!(
246 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
247 blocks_status.last_continuous_block + 1,
248 blocks_status.max_block
249 );
250 self.sync_blocks(
251 blocks_status.last_continuous_block + 1,
252 Some(blocks_status.max_block),
253 false,
254 )
255 .await?;
256
257 tracing::info!(
258 "Block syncing Phase 2: Continuing with fast COPY from block {}",
259 (blocks_status.max_block + 1).separate_with_commas()
260 );
261 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
262 .await?;
263 }
264 } else {
265 self.sync_blocks(from_block, to_block, true).await?;
266 }
267
268 Ok(())
269 }
270
271 pub async fn sync_blocks(
277 &mut self,
278 from_block: u64,
279 to_block: Option<u64>,
280 use_copy_command: bool,
281 ) -> anyhow::Result<()> {
282 let to_block = if let Some(block) = to_block {
283 block
284 } else {
285 self.hypersync_client.current_block().await
286 };
287 let total_blocks = to_block.saturating_sub(from_block) + 1;
288 tracing::info!(
289 "Syncing blocks from {} to {} (total: {} blocks)",
290 from_block.separate_with_commas(),
291 to_block.separate_with_commas(),
292 total_blocks.separate_with_commas()
293 );
294
295 if let Err(e) = self.cache.toggle_performance_settings(true).await {
297 tracing::warn!("Failed to enable performance settings: {e}");
298 }
299
300 let blocks_stream = self
301 .hypersync_client
302 .request_blocks_stream(from_block, Some(to_block))
303 .await;
304
305 tokio::pin!(blocks_stream);
306
307 let mut metrics = BlockchainSyncReporter::new(
308 BlockchainSyncReportItems::Blocks,
309 from_block,
310 total_blocks,
311 BLOCKS_PROCESS_IN_SYNC_REPORT,
312 );
313
314 const BATCH_SIZE: usize = 1000;
316 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
317
318 let cancellation_token = self.cancellation_token.clone();
319 let sync_result = tokio::select! {
320 () = cancellation_token.cancelled() => {
321 tracing::info!("Block sync cancelled");
322 Err(anyhow::anyhow!("Sync cancelled"))
323 }
324 result = async {
325 while let Some(block) = blocks_stream.next().await {
326 let block_number = block.number;
327 if self.cache.get_block_timestamp(block_number).is_some() {
328 continue;
329 }
330 batch.push(block);
331
332 if batch.len() >= BATCH_SIZE || block_number >= to_block {
334 let batch_size = batch.len();
335
336 self.cache.add_blocks_batch(batch, use_copy_command).await?;
337 metrics.update(batch_size);
338
339 batch = Vec::with_capacity(BATCH_SIZE);
341 }
342
343 if metrics.should_log_progress(block_number, to_block) {
345 metrics.log_progress(block_number);
346 }
347 }
348
349 if !batch.is_empty() {
351 let batch_size = batch.len();
352 self.cache.add_blocks_batch(batch, use_copy_command).await?;
353 metrics.update(batch_size);
354 }
355
356 metrics.log_final_stats();
357 Ok(())
358 } => result
359 };
360
361 sync_result?;
362
363 if let Err(e) = self.cache.toggle_performance_settings(false).await {
365 tracing::warn!("Failed to restore default settings: {e}");
366 }
367
368 Ok(())
369 }
370
371 pub async fn sync_pool_events(
377 &mut self,
378 dex: &DexType,
379 pool_identifier: PoolIdentifier,
380 from_block: Option<u64>,
381 to_block: Option<u64>,
382 reset: bool,
383 ) -> anyhow::Result<()> {
384 let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
385 let pool_display = pool.to_full_spec_string();
386 let from_block = from_block.unwrap_or(pool.creation_block);
387 let pool_address = &pool.address;
389
390 let (last_synced_block, effective_from_block) = if reset {
391 (None, from_block)
392 } else {
393 let last_synced_block = self
394 .cache
395 .get_pool_last_synced_block(dex, &pool_identifier)
396 .await?;
397 let effective_from_block = last_synced_block
398 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
399 (last_synced_block, effective_from_block)
400 };
401
402 let to_block = match to_block {
403 Some(block) => block,
404 None => self.hypersync_client.current_block().await,
405 };
406
407 if effective_from_block > to_block {
409 tracing::info!(
410 "D {} already synced to block {} (current: {}), skipping sync",
411 dex,
412 last_synced_block.unwrap_or(0).separate_with_commas(),
413 to_block.separate_with_commas()
414 );
415 return Ok(());
416 }
417
418 let last_block_across_pool_events_table = self
420 .cache
421 .get_pool_event_tables_last_block(&pool_identifier)
422 .await?;
423
424 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
425 tracing::info!(
426 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
427 pool_display,
428 effective_from_block.separate_with_commas(),
429 to_block.separate_with_commas(),
430 total_blocks.separate_with_commas(),
431 if let Some(last_synced) = last_synced_block {
432 format!(
433 " - resuming from last synced block {}",
434 last_synced.separate_with_commas()
435 )
436 } else {
437 String::new()
438 }
439 );
440
441 let mut metrics = BlockchainSyncReporter::new(
442 BlockchainSyncReportItems::PoolEvents,
443 effective_from_block,
444 total_blocks,
445 BLOCKS_PROCESS_IN_SYNC_REPORT,
446 );
447 let dex_extended = self.get_dex_extended(dex)?.clone();
448 let swap_event_signature = dex_extended.swap_created_event.as_ref();
449 let mint_event_signature = dex_extended.mint_created_event.as_ref();
450 let burn_event_signature = dex_extended.burn_created_event.as_ref();
451 let collect_event_signature = dex_extended.collect_created_event.as_ref();
452 let flash_event_signature = dex_extended.flash_created_event.as_ref();
453 let initialize_event_signature: Option<&str> =
454 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
455
456 let swap_sig_bytes = hex::decode(
458 swap_event_signature
459 .strip_prefix("0x")
460 .unwrap_or(swap_event_signature),
461 )?;
462 let mint_sig_bytes = hex::decode(
463 mint_event_signature
464 .strip_prefix("0x")
465 .unwrap_or(mint_event_signature),
466 )?;
467 let burn_sig_bytes = hex::decode(
468 burn_event_signature
469 .strip_prefix("0x")
470 .unwrap_or(burn_event_signature),
471 )?;
472 let collect_sig_bytes = hex::decode(
473 collect_event_signature
474 .strip_prefix("0x")
475 .unwrap_or(collect_event_signature),
476 )?;
477 let flash_sig_bytes = flash_event_signature
478 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
479 let initialize_sig_bytes = initialize_event_signature
480 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481
482 let mut event_signatures = vec![
483 swap_event_signature,
484 mint_event_signature,
485 burn_event_signature,
486 collect_event_signature,
487 ];
488 if let Some(event) = dex_extended.initialize_event.as_ref() {
489 event_signatures.push(event);
490 }
491 if let Some(event) = dex_extended.flash_created_event.as_ref() {
492 event_signatures.push(event);
493 }
494 let pool_events_stream = self
495 .hypersync_client
496 .request_contract_events_stream(
497 effective_from_block,
498 Some(to_block),
499 pool_address,
500 event_signatures,
501 )
502 .await;
503 tokio::pin!(pool_events_stream);
504
505 let mut last_block_saved = effective_from_block;
506 let mut blocks_processed = 0;
507
508 const EVENT_BATCH_SIZE: usize = 20000;
510 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513 let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515 let mut beyond_stale_data = last_block_across_pool_events_table
517 .is_none_or(|tables_max| effective_from_block > tables_max);
518
519 let cancellation_token = self.cancellation_token.clone();
520 let sync_result = tokio::select! {
521 () = cancellation_token.cancelled() => {
522 tracing::info!("Pool event sync cancelled");
523 Err(anyhow::anyhow!("Sync cancelled"))
524 }
525 result = async {
526 while let Some(log) = pool_events_stream.next().await {
527 let block_number = extract_block_number(&log)?;
528 blocks_processed += block_number - last_block_saved;
529 last_block_saved = block_number;
530
531 let event_sig_bytes = extract_event_signature_bytes(&log)?;
532 if event_sig_bytes == swap_sig_bytes.as_slice() {
533 let swap_event = dex_extended.parse_swap_event_hypersync(log)?;
534 match self.process_pool_swap_event(&swap_event, &pool) {
535 Ok(swap) => swap_batch.push(swap),
536 Err(e) => tracing::error!("Failed to process swap event: {e}"),
537 }
538 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539 let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
540 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542 Err(e) => tracing::error!("Failed to process mint event: {e}"),
543 }
544 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545 let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
546 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548 Err(e) => tracing::error!("Failed to process burn event: {e}"),
549 }
550 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551 let collect_event = dex_extended.parse_collect_event_hypersync(log)?;
552 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553 Ok(fee_collect) => collect_batch.push(fee_collect),
554 Err(e) => tracing::error!("Failed to process collect event: {e}"),
555 }
556 } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557 let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
558 self.cache
559 .update_pool_initialize_price_tick(&initialize_event)
560 .await?;
561 } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562 if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
563 match parse_fn(dex_extended.dex.clone(), log) {
564 Ok(flash_event) => {
565 match self.process_pool_flash_event(&flash_event, &pool) {
566 Ok(flash) => flash_batch.push(flash),
567 Err(e) => tracing::error!("Failed to process flash event: {e}"),
568 }
569 }
570 Err(e) => tracing::error!("Failed to parse flash event: {e}"),
571 }
572 }
573 } else {
574 let event_signature = hex::encode(event_sig_bytes);
575 tracing::error!(
576 "Unexpected event signature: {} for log {:?}",
577 event_signature,
578 log
579 );
580 }
581
582 if !beyond_stale_data
584 && last_block_across_pool_events_table
585 .is_some_and(|table_max| block_number > table_max)
586 {
587 tracing::info!(
588 "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
589 block_number
590 );
591
592 self.flush_event_batches(
594 EVENT_BATCH_SIZE,
595 &mut swap_batch,
596 &mut liquidity_batch,
597 &mut collect_batch,
598 &mut flash_batch,
599 false,
600 true,
601 )
602 .await?;
603
604 beyond_stale_data = true;
605 tracing::info!("Switched to COPY mode - future batches will use COPY command");
606 } else {
607 self.flush_event_batches(
609 EVENT_BATCH_SIZE,
610 &mut swap_batch,
611 &mut liquidity_batch,
612 &mut collect_batch,
613 &mut flash_batch,
614 false, false,
616 )
617 .await?;
618 }
619
620 metrics.update(blocks_processed as usize);
621 blocks_processed = 0;
622
623 if metrics.should_log_progress(block_number, to_block) {
625 metrics.log_progress(block_number);
626 self.cache
627 .update_pool_last_synced_block(dex, &pool_identifier, block_number)
628 .await?;
629 }
630 }
631
632 self.flush_event_batches(
633 EVENT_BATCH_SIZE,
634 &mut swap_batch,
635 &mut liquidity_batch,
636 &mut collect_batch,
637 &mut flash_batch,
638 false,
639 true,
640 )
641 .await?;
642
643 metrics.log_final_stats();
644 self.cache
645 .update_pool_last_synced_block(dex, &pool_identifier, to_block)
646 .await?;
647
648 tracing::info!(
649 "Successfully synced Dex '{}' Pool '{}' up to block {}",
650 dex,
651 pool_display,
652 to_block.separate_with_commas()
653 );
654 Ok(())
655 } => result
656 };
657
658 sync_result
659 }
660
661 #[allow(clippy::too_many_arguments)]
662 async fn flush_event_batches(
663 &mut self,
664 event_batch_size: usize,
665 swap_batch: &mut Vec<PoolSwap>,
666 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
667 collect_batch: &mut Vec<PoolFeeCollect>,
668 flash_batch: &mut Vec<PoolFlash>,
669 use_copy_command: bool,
670 force_flush_all: bool,
671 ) -> anyhow::Result<()> {
672 if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
673 self.cache
674 .add_pool_swaps_batch(swap_batch, use_copy_command)
675 .await?;
676 swap_batch.clear();
677 }
678 if (force_flush_all || liquidity_batch.len() >= event_batch_size)
679 && !liquidity_batch.is_empty()
680 {
681 self.cache
682 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
683 .await?;
684 liquidity_batch.clear();
685 }
686 if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
687 {
688 self.cache
689 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
690 .await?;
691 collect_batch.clear();
692 }
693 if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
694 self.cache.add_pool_flash_batch(flash_batch).await?;
695 flash_batch.clear();
696 }
697 Ok(())
698 }
699
700 pub fn process_pool_swap_event(
710 &self,
711 swap_event: &SwapEvent,
712 pool: &SharedPool,
713 ) -> anyhow::Result<PoolSwap> {
714 let timestamp = self
715 .cache
716 .get_block_timestamp(swap_event.block_number)
717 .copied();
718 let mut swap = swap_event.to_pool_swap(
719 self.chain.clone(),
720 pool.instrument_id,
721 pool.pool_identifier,
722 timestamp,
723 );
724 swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
725
726 Ok(swap)
727 }
728
729 pub fn process_pool_mint_event(
735 &self,
736 mint_event: &MintEvent,
737 pool: &SharedPool,
738 dex_extended: &DexExtended,
739 ) -> anyhow::Result<PoolLiquidityUpdate> {
740 let timestamp = self
741 .cache
742 .get_block_timestamp(mint_event.block_number)
743 .copied();
744
745 let liquidity_update = mint_event.to_pool_liquidity_update(
746 self.chain.clone(),
747 dex_extended.dex.clone(),
748 pool.instrument_id,
749 timestamp,
750 );
751
752 Ok(liquidity_update)
755 }
756
757 pub fn process_pool_burn_event(
764 &self,
765 burn_event: &BurnEvent,
766 pool: &SharedPool,
767 dex_extended: &DexExtended,
768 ) -> anyhow::Result<PoolLiquidityUpdate> {
769 let timestamp = self
770 .cache
771 .get_block_timestamp(burn_event.block_number)
772 .copied();
773
774 let liquidity_update = burn_event.to_pool_liquidity_update(
775 self.chain.clone(),
776 dex_extended.dex.clone(),
777 pool.instrument_id,
778 pool.pool_identifier,
779 timestamp,
780 );
781
782 Ok(liquidity_update)
785 }
786
787 pub fn process_pool_collect_event(
793 &self,
794 collect_event: &CollectEvent,
795 pool: &SharedPool,
796 dex_extended: &DexExtended,
797 ) -> anyhow::Result<PoolFeeCollect> {
798 let timestamp = self
799 .cache
800 .get_block_timestamp(collect_event.block_number)
801 .copied();
802
803 let fee_collect = collect_event.to_pool_fee_collect(
804 self.chain.clone(),
805 dex_extended.dex.clone(),
806 pool.instrument_id,
807 timestamp,
808 );
809
810 Ok(fee_collect)
811 }
812
813 pub fn process_pool_flash_event(
819 &self,
820 flash_event: &FlashEvent,
821 pool: &SharedPool,
822 ) -> anyhow::Result<PoolFlash> {
823 let timestamp = self
824 .cache
825 .get_block_timestamp(flash_event.block_number)
826 .copied();
827
828 let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
829
830 Ok(flash)
831 }
832
833 pub async fn sync_exchange_pools(
844 &mut self,
845 dex: &DexType,
846 from_block: u64,
847 to_block: Option<u64>,
848 reset: bool,
849 ) -> anyhow::Result<()> {
850 let dex_extended = self.get_dex_extended(dex)?.clone();
851
852 let mut service = PoolDiscoveryService::new(
853 self.chain.clone(),
854 &mut self.cache,
855 &self.tokens,
856 &self.hypersync_client,
857 self.cancellation_token.clone(),
858 self.config.clone(),
859 );
860
861 service
862 .sync_pools(&dex_extended, from_block, to_block, reset)
863 .await?;
864
865 Ok(())
866 }
867
868 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
879 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
880 tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
881
882 self.cache.add_dex(dex_extended.dex.clone()).await?;
883 let _ = self.cache.load_pools(&dex_id).await?;
884
885 self.subscription_manager.register_dex_for_subscriptions(
886 dex_id,
887 dex_extended.swap_created_event.as_ref(),
888 dex_extended.mint_created_event.as_ref(),
889 dex_extended.burn_created_event.as_ref(),
890 dex_extended.collect_created_event.as_ref(),
891 dex_extended.flash_created_event.as_deref(),
892 );
893 Ok(())
894 } else {
895 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
896 }
897 }
898
899 pub async fn bootstrap_latest_pool_profiler(
920 &mut self,
921 pool: &SharedPool,
922 ) -> anyhow::Result<(PoolProfiler, bool)> {
923 tracing::info!(
924 "Bootstrapping latest pool profiler for pool {}",
925 pool.address
926 );
927
928 if self.cache.database.is_none() {
929 anyhow::bail!(
930 "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
931 );
932 }
933
934 let mut profiler = PoolProfiler::new(pool.clone());
935
936 let from_position = match self
938 .cache
939 .database
940 .as_ref()
941 .unwrap()
942 .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
943 .await
944 {
945 Ok(Some(snapshot)) => {
946 tracing::info!(
947 "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
948 snapshot.block_position.number.separate_with_commas(),
949 snapshot.positions.len(),
950 snapshot.ticks.len()
951 );
952 let block_position = snapshot.block_position.clone();
953 profiler.restore_from_snapshot(snapshot)?;
954 tracing::info!("Restored profiler from snapshot");
955 Some(block_position)
956 }
957 _ => {
958 tracing::info!("No valid snapshot found, processing from beginning");
959 None
960 }
961 };
962
963 if self
967 .cache
968 .database
969 .as_ref()
970 .unwrap()
971 .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
972 .await?
973 .is_none()
974 {
975 return self
976 .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
977 .await;
978 }
979
980 if let Err(e) = self
982 .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
983 .await
984 {
985 tracing::error!("Failed to sync pool events for snapshot request: {}", e);
986 }
987
988 if !profiler.is_initialized {
989 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
990 profiler.initialize(initial_sqrt_price_x96);
991 } else {
992 anyhow::bail!(
993 "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
994 );
995 }
996 }
997
998 let from_block = from_position
999 .as_ref()
1000 .map_or(profiler.pool.creation_block, |block_position| {
1001 block_position.number
1002 });
1003 let to_block = self.hypersync_client.current_block().await;
1004 let total_blocks = to_block.saturating_sub(from_block) + 1;
1005
1006 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1008
1009 let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1010 pool.chain.clone(),
1011 pool.dex.clone(),
1012 pool.instrument_id,
1013 pool.pool_identifier,
1014 from_position.clone(),
1015 );
1016
1017 while let Some(result) = stream.next().await {
1018 match result {
1019 Ok(event) => {
1020 profiler.process(&event)?;
1021 }
1022 Err(e) => log::error!("Error processing event: {e}"),
1023 }
1024 }
1025
1026 profiler.finalize_reporting();
1027
1028 Ok((profiler, false))
1029 }
1030
1031 async fn construct_pool_profiler_from_hypersync_rpc(
1051 &self,
1052 mut profiler: PoolProfiler,
1053 from_position: Option<BlockPosition>,
1054 ) -> anyhow::Result<(PoolProfiler, bool)> {
1055 tracing::info!(
1056 "Constructing pool profiler from hypersync stream and RPC final state querying"
1057 );
1058 let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1059 let mint_event_signature = dex_extended.mint_created_event.as_ref();
1060 let burn_event_signature = dex_extended.burn_created_event.as_ref();
1061 let initialize_event_signature =
1062 if let Some(initialize_event) = &dex_extended.initialize_event {
1063 initialize_event.as_ref()
1064 } else {
1065 anyhow::bail!(
1066 "DEX {} does not have initialize event set.",
1067 &profiler.pool.dex.name
1068 );
1069 };
1070 let mint_sig_bytes = hex::decode(
1071 mint_event_signature
1072 .strip_prefix("0x")
1073 .unwrap_or(mint_event_signature),
1074 )?;
1075 let burn_sig_bytes = hex::decode(
1076 burn_event_signature
1077 .strip_prefix("0x")
1078 .unwrap_or(burn_event_signature),
1079 )?;
1080 let initialize_sig_bytes = hex::decode(
1081 initialize_event_signature
1082 .strip_prefix("0x")
1083 .unwrap_or(initialize_event_signature),
1084 )?;
1085
1086 let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1087 block_position.number
1088 });
1089 let to_block = self.hypersync_client.current_block().await;
1090 let total_blocks = to_block.saturating_sub(from_block) + 1;
1091
1092 tracing::info!(
1093 "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1094 profiler.pool.address,
1095 from_block.separate_with_commas(),
1096 to_block.separate_with_commas(),
1097 total_blocks.separate_with_commas()
1098 );
1099
1100 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1102
1103 let pool_events_stream = self
1104 .hypersync_client
1105 .request_contract_events_stream(
1106 from_block,
1107 None,
1108 &profiler.pool.address,
1109 vec![
1110 mint_event_signature,
1111 burn_event_signature,
1112 initialize_event_signature,
1113 ],
1114 )
1115 .await;
1116 tokio::pin!(pool_events_stream);
1117
1118 while let Some(log) = pool_events_stream.next().await {
1119 let event_sig_bytes = extract_event_signature_bytes(&log)?;
1120
1121 if event_sig_bytes == initialize_sig_bytes {
1122 let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
1123 profiler.initialize(initialize_event.sqrt_price_x96);
1124 self.cache
1125 .database
1126 .as_ref()
1127 .unwrap()
1128 .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1129 .await?;
1130 } else if event_sig_bytes == mint_sig_bytes {
1131 let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
1132 match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1133 Ok(liquidity_update) => {
1134 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1135 }
1136 Err(e) => tracing::error!("Failed to process mint event: {e}"),
1137 }
1138 } else if event_sig_bytes == burn_sig_bytes {
1139 let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
1140 match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1141 Ok(liquidity_update) => {
1142 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1143 }
1144 Err(e) => tracing::error!("Failed to process burn event: {e}"),
1145 }
1146 } else {
1147 let event_signature = hex::encode(event_sig_bytes);
1148 tracing::error!(
1149 "Unexpected event signature in bootstrap_latest_pool_profiler: {} for log {:?}",
1150 event_signature,
1151 log
1152 );
1153 }
1154 }
1155
1156 profiler.finalize_reporting();
1157
1158 match self.get_on_chain_snapshot(&profiler).await {
1160 Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1161 Err(e) => tracing::error!(
1162 "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1163 ),
1164 }
1165
1166 Ok((profiler, true))
1167 }
1168
1169 pub async fn check_snapshot_validity(
1184 &self,
1185 profiler: &PoolProfiler,
1186 already_validated: bool,
1187 ) -> anyhow::Result<bool> {
1188 let (is_valid, block_position) = if already_validated {
1190 tracing::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1192 let last_event = profiler
1193 .last_processed_event
1194 .clone()
1195 .expect("Profiler should have last_processed_event");
1196 (true, last_event)
1197 } else {
1198 match self.get_on_chain_snapshot(profiler).await {
1200 Ok(on_chain_snapshot) => {
1201 tracing::info!("Comparing profiler state with on-chain state...");
1202 let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1203 if !valid {
1204 tracing::error!(
1205 "Pool profiler state does NOT match on-chain smart contract state"
1206 );
1207 }
1208 (valid, on_chain_snapshot.block_position)
1209 }
1210 Err(e) => {
1211 tracing::error!("Failed to check snapshot validity: {e}");
1212 return Ok(false);
1213 }
1214 }
1215 };
1216
1217 if is_valid && let Some(cache_database) = &self.cache.database {
1219 cache_database
1220 .mark_pool_snapshot_valid(
1221 profiler.pool.chain.chain_id,
1222 &profiler.pool.pool_identifier,
1223 block_position.number,
1224 block_position.transaction_index,
1225 block_position.log_index,
1226 )
1227 .await?;
1228 tracing::info!("Marked pool profiler snapshot as valid");
1229 }
1230
1231 Ok(is_valid)
1232 }
1233
1234 async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1240 if profiler.pool.dex.name == DexType::UniswapV3 {
1241 let last_processed_event = profiler
1242 .last_processed_event
1243 .clone()
1244 .expect("We expect at least one processed event in the pool");
1245 let on_chain_snapshot = self
1246 .univ3_pool
1247 .fetch_snapshot(
1248 &profiler.pool.address,
1249 profiler.pool.instrument_id,
1250 profiler.get_active_tick_values().as_slice(),
1251 &profiler.get_all_position_keys(),
1252 last_processed_event,
1253 )
1254 .await?;
1255
1256 Ok(on_chain_snapshot)
1257 } else {
1258 anyhow::bail!(
1259 "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1260 profiler.pool.dex.name
1261 )
1262 }
1263 }
1264
1265 pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1274 if let Some(database) = &self.cache.database {
1275 tracing::info!(
1276 "Replaying historical events for pool {} to hydrate profiler",
1277 pool.instrument_id
1278 );
1279
1280 let mut event_stream = database.stream_pool_events(
1281 self.chain.clone(),
1282 dex.clone(),
1283 pool.instrument_id,
1284 pool.pool_identifier,
1285 None,
1286 );
1287 let mut event_count = 0;
1288
1289 while let Some(event_result) = event_stream.next().await {
1290 match event_result {
1291 Ok(event) => {
1292 let data_event = match event {
1293 DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1294 DexPoolData::LiquidityUpdate(update) => {
1295 DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1296 }
1297 DexPoolData::FeeCollect(collect) => {
1298 DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1299 }
1300 DexPoolData::Flash(flash) => {
1301 DataEvent::DeFi(DefiData::PoolFlash(flash))
1302 }
1303 };
1304 self.send_data(data_event);
1305 event_count += 1;
1306 }
1307 Err(e) => {
1308 tracing::error!(
1309 "Error streaming event for pool {}: {e}",
1310 pool.instrument_id
1311 );
1312 }
1313 }
1314 }
1315
1316 tracing::info!(
1317 "Replayed {event_count} historical events for pool {}",
1318 pool.instrument_id
1319 );
1320 } else {
1321 tracing::debug!(
1322 "No database available, skipping event replay for pool {}",
1323 pool.instrument_id
1324 );
1325 }
1326
1327 Ok(())
1328 }
1329
1330 fn determine_from_block(&self) -> u64 {
1332 self.config
1333 .from_block
1334 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1335 }
1336
1337 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1339 if !self.cache.get_registered_dexes().contains(dex_id) {
1340 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1341 }
1342
1343 match get_dex_extended(self.chain.name, dex_id) {
1344 Some(dex) => Ok(dex),
1345 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1346 }
1347 }
1348
1349 pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1355 match self.cache.get_pool(pool_identifier) {
1356 Some(pool) => Ok(pool),
1357 None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1358 }
1359 }
1360
1361 pub fn send_data(&self, data: DataEvent) {
1363 if let Some(data_tx) = &self.data_tx {
1364 tracing::debug!("Sending {data}");
1365
1366 if let Err(e) = data_tx.send(data) {
1367 tracing::error!("Failed to send data: {e}");
1368 }
1369 } else {
1370 tracing::error!("No data event channel for sending data");
1371 }
1372 }
1373
1374 pub async fn disconnect(&mut self) {
1379 self.hypersync_client.disconnect().await;
1380 }
1381}