1use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::formatting::Separable;
21use nautilus_model::defi::{
22 Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
23 SharedChain, SharedDex, SharedPool,
24 data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25 pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
26 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
27};
28
29use crate::{
30 cache::BlockchainCache,
31 config::BlockchainDataClientConfig,
32 contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
33 data::subscription::DefiDataSubscriptionManager,
34 events::{
35 burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
36 },
37 exchanges::{extended::DexExtended, get_dex_extended},
38 hypersync::{
39 client::HyperSyncClient,
40 helpers::{extract_block_number, extract_event_signature_bytes},
41 },
42 rpc::{
43 BlockchainRpcClient, BlockchainRpcClientAny,
44 chains::{
45 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
46 polygon::PolygonRpcClient,
47 },
48 http::BlockchainHttpRpcClient,
49 types::BlockchainMessage,
50 },
51 services::PoolDiscoveryService,
52};
53
54const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
55
56#[derive(Debug)]
61pub struct BlockchainDataClientCore {
62 pub chain: SharedChain,
64 pub config: BlockchainDataClientConfig,
66 pub cache: BlockchainCache,
68 tokens: Erc20Contract,
70 univ3_pool: UniswapV3PoolContract,
72 pub hypersync_client: HyperSyncClient,
74 pub rpc_client: Option<BlockchainRpcClientAny>,
76 pub subscription_manager: DefiDataSubscriptionManager,
78 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
80 cancellation_token: tokio_util::sync::CancellationToken,
82}
83
84impl BlockchainDataClientCore {
85 #[must_use]
91 pub fn new(
92 config: BlockchainDataClientConfig,
93 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
94 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
95 cancellation_token: tokio_util::sync::CancellationToken,
96 ) -> Self {
97 let chain = config.chain.clone();
98 let cache = BlockchainCache::new(chain.clone());
99
100 log::info!(
102 "Initializing blockchain data client for '{}' with HTTP RPC: {}",
103 chain.name,
104 config.http_rpc_url
105 );
106
107 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109 log::info!("WebSocket RPC URL: {wss_rpc_url}");
110 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
111 } else {
112 log::info!("Using HyperSync for live data (no WebSocket RPC)");
113 None
114 };
115 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116 config.http_rpc_url.clone(),
117 config.rpc_requests_per_second,
118 ));
119 let erc20_contract = Erc20Contract::new(
120 http_rpc_client.clone(),
121 config.pool_filters.remove_pools_with_empty_erc20fields,
122 );
123
124 let hypersync_client =
125 HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
126 Self {
127 chain,
128 config,
129 rpc_client,
130 tokens: erc20_contract,
131 univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
132 cache,
133 hypersync_client,
134 subscription_manager: DefiDataSubscriptionManager::new(),
135 data_tx,
136 cancellation_token,
137 }
138 }
139
140 pub async fn initialize_cache_database(&mut self) {
142 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
143 log::info!(
144 "Initializing blockchain cache on database '{}'",
145 pg_connect_options.database
146 );
147 self.cache
148 .initialize_database(pg_connect_options.clone().into())
149 .await;
150 }
151 }
152
153 fn initialize_rpc_client(
155 blockchain: Blockchain,
156 wss_rpc_url: String,
157 ) -> BlockchainRpcClientAny {
158 match blockchain {
159 Blockchain::Ethereum => {
160 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
161 }
162 Blockchain::Polygon => {
163 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
164 }
165 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
166 Blockchain::Arbitrum => {
167 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
168 }
169 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
170 }
171 }
172
173 pub async fn connect(&mut self) -> anyhow::Result<()> {
179 log::info!(
180 "Connecting blockchain data client for '{}'",
181 self.chain.name
182 );
183 self.initialize_cache_database().await;
184
185 if let Some(ref mut rpc_client) = self.rpc_client {
186 rpc_client.connect().await?;
187 }
188
189 let from_block = self.determine_from_block();
190
191 log::info!(
192 "Connecting to blockchain data source for '{}' from block {}",
193 self.chain.name,
194 from_block.separate_with_commas()
195 );
196
197 self.cache.initialize_chain().await;
199 self.cache.connect(from_block).await?;
201 for dex in self.config.dex_ids.clone() {
205 self.register_dex_exchange(dex).await?;
206 self.sync_exchange_pools(&dex, from_block, None, false)
207 .await?;
208 }
209
210 Ok(())
211 }
212
213 pub async fn sync_blocks_checked(
219 &mut self,
220 from_block: u64,
221 to_block: Option<u64>,
222 ) -> anyhow::Result<()> {
223 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
224 if blocks_status.is_consistent() {
226 log::info!(
227 "Cache is consistent: no gaps detected (last continuous block: {})",
228 blocks_status.last_continuous_block
229 );
230 let target_block = max(blocks_status.max_block + 1, from_block);
231 log::info!(
232 "Starting fast sync with COPY from block {}",
233 target_block.separate_with_commas()
234 );
235 self.sync_blocks(target_block, to_block, true).await?;
236 } else {
237 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
238 log::info!(
239 "Cache inconsistency detected: {} blocks missing between {} and {}",
240 gap_size,
241 blocks_status.last_continuous_block + 1,
242 blocks_status.max_block
243 );
244
245 log::info!(
246 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
247 blocks_status.last_continuous_block + 1,
248 blocks_status.max_block
249 );
250 self.sync_blocks(
251 blocks_status.last_continuous_block + 1,
252 Some(blocks_status.max_block),
253 false,
254 )
255 .await?;
256
257 log::info!(
258 "Block syncing Phase 2: Continuing with fast COPY from block {}",
259 (blocks_status.max_block + 1).separate_with_commas()
260 );
261 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
262 .await?;
263 }
264 } else {
265 self.sync_blocks(from_block, to_block, true).await?;
266 }
267
268 Ok(())
269 }
270
271 pub async fn sync_blocks(
277 &mut self,
278 from_block: u64,
279 to_block: Option<u64>,
280 use_copy_command: bool,
281 ) -> anyhow::Result<()> {
282 const BATCH_SIZE: usize = 1000;
283
284 let to_block = if let Some(block) = to_block {
285 block
286 } else {
287 self.hypersync_client.current_block().await
288 };
289 let total_blocks = to_block.saturating_sub(from_block) + 1;
290 log::info!(
291 "Syncing blocks from {} to {} (total: {} blocks)",
292 from_block.separate_with_commas(),
293 to_block.separate_with_commas(),
294 total_blocks.separate_with_commas()
295 );
296
297 if let Err(e) = self.cache.toggle_performance_settings(true).await {
299 log::warn!("Failed to enable performance settings: {e}");
300 }
301
302 let blocks_stream = self
303 .hypersync_client
304 .request_blocks_stream(from_block, Some(to_block))
305 .await;
306
307 tokio::pin!(blocks_stream);
308
309 let mut metrics = BlockchainSyncReporter::new(
310 BlockchainSyncReportItems::Blocks,
311 from_block,
312 total_blocks,
313 BLOCKS_PROCESS_IN_SYNC_REPORT,
314 );
315
316 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
317
318 let cancellation_token = self.cancellation_token.clone();
319 let sync_result = tokio::select! {
320 () = cancellation_token.cancelled() => {
321 log::info!("Block sync cancelled");
322 Err(anyhow::anyhow!("Sync cancelled"))
323 }
324 result = async {
325 while let Some(block) = blocks_stream.next().await {
326 let block_number = block.number;
327 if self.cache.get_block_timestamp(block_number).is_some() {
328 continue;
329 }
330 batch.push(block);
331
332 if batch.len() >= BATCH_SIZE || block_number >= to_block {
334 let batch_size = batch.len();
335
336 self.cache.add_blocks_batch(batch, use_copy_command).await?;
337 metrics.update(batch_size);
338
339 batch = Vec::with_capacity(BATCH_SIZE);
341 }
342
343 if metrics.should_log_progress(block_number, to_block) {
345 metrics.log_progress(block_number);
346 }
347 }
348
349 if !batch.is_empty() {
351 let batch_size = batch.len();
352 self.cache.add_blocks_batch(batch, use_copy_command).await?;
353 metrics.update(batch_size);
354 }
355
356 metrics.log_final_stats();
357 Ok(())
358 } => result
359 };
360
361 sync_result?;
362
363 if let Err(e) = self.cache.toggle_performance_settings(false).await {
365 log::warn!("Failed to restore default settings: {e}");
366 }
367
368 Ok(())
369 }
370
371 pub async fn sync_pool_events(
377 &mut self,
378 dex: &DexType,
379 pool_identifier: PoolIdentifier,
380 from_block: Option<u64>,
381 to_block: Option<u64>,
382 reset: bool,
383 ) -> anyhow::Result<()> {
384 const EVENT_BATCH_SIZE: usize = 20000;
385
386 let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
387 let pool_display = pool.to_full_spec_string();
388 let from_block = from_block.unwrap_or(pool.creation_block);
389 let pool_address = &pool.address;
391
392 let (last_synced_block, effective_from_block) = if reset {
393 (None, from_block)
394 } else {
395 let last_synced_block = self
396 .cache
397 .get_pool_last_synced_block(dex, &pool_identifier)
398 .await?;
399 let effective_from_block = last_synced_block
400 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
401 (last_synced_block, effective_from_block)
402 };
403
404 let to_block = match to_block {
405 Some(block) => block,
406 None => self.hypersync_client.current_block().await,
407 };
408
409 if effective_from_block > to_block {
411 log::info!(
412 "D {} already synced to block {} (current: {}), skipping sync",
413 dex,
414 last_synced_block.unwrap_or(0).separate_with_commas(),
415 to_block.separate_with_commas()
416 );
417 return Ok(());
418 }
419
420 let last_block_across_pool_events_table = self
422 .cache
423 .get_pool_event_tables_last_block(&pool_identifier)
424 .await?;
425
426 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
427 log::info!(
428 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
429 pool_display,
430 effective_from_block.separate_with_commas(),
431 to_block.separate_with_commas(),
432 total_blocks.separate_with_commas(),
433 if let Some(last_synced) = last_synced_block {
434 format!(
435 " - resuming from last synced block {}",
436 last_synced.separate_with_commas()
437 )
438 } else {
439 String::new()
440 }
441 );
442
443 let mut metrics = BlockchainSyncReporter::new(
444 BlockchainSyncReportItems::PoolEvents,
445 effective_from_block,
446 total_blocks,
447 BLOCKS_PROCESS_IN_SYNC_REPORT,
448 );
449 let dex_extended = self.get_dex_extended(dex)?.clone();
450 let swap_event_signature = dex_extended.swap_created_event.as_ref();
451 let mint_event_signature = dex_extended.mint_created_event.as_ref();
452 let burn_event_signature = dex_extended.burn_created_event.as_ref();
453 let collect_event_signature = dex_extended.collect_created_event.as_ref();
454 let flash_event_signature = dex_extended.flash_created_event.as_ref();
455 let initialize_event_signature: Option<&str> =
456 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
457
458 let swap_sig_bytes = hex::decode(
460 swap_event_signature
461 .strip_prefix("0x")
462 .unwrap_or(swap_event_signature),
463 )?;
464 let mint_sig_bytes = hex::decode(
465 mint_event_signature
466 .strip_prefix("0x")
467 .unwrap_or(mint_event_signature),
468 )?;
469 let burn_sig_bytes = hex::decode(
470 burn_event_signature
471 .strip_prefix("0x")
472 .unwrap_or(burn_event_signature),
473 )?;
474 let collect_sig_bytes = hex::decode(
475 collect_event_signature
476 .strip_prefix("0x")
477 .unwrap_or(collect_event_signature),
478 )?;
479 let flash_sig_bytes = flash_event_signature
480 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481 let initialize_sig_bytes = initialize_event_signature
482 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
483
484 let mut event_signatures = vec![
485 swap_event_signature,
486 mint_event_signature,
487 burn_event_signature,
488 collect_event_signature,
489 ];
490 if let Some(event) = dex_extended.initialize_event.as_ref() {
491 event_signatures.push(event);
492 }
493 if let Some(event) = dex_extended.flash_created_event.as_ref() {
494 event_signatures.push(event);
495 }
496 let pool_events_stream = self
497 .hypersync_client
498 .request_contract_events_stream(
499 effective_from_block,
500 Some(to_block),
501 pool_address,
502 event_signatures,
503 )
504 .await;
505 tokio::pin!(pool_events_stream);
506
507 let mut last_block_saved = effective_from_block;
508 let mut blocks_processed = 0;
509
510 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513 let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515 let mut beyond_stale_data = last_block_across_pool_events_table
517 .is_none_or(|tables_max| effective_from_block > tables_max);
518
519 let cancellation_token = self.cancellation_token.clone();
520 let sync_result = tokio::select! {
521 () = cancellation_token.cancelled() => {
522 log::info!("Pool event sync cancelled");
523 Err(anyhow::anyhow!("Sync cancelled"))
524 }
525 result = async {
526 while let Some(log) = pool_events_stream.next().await {
527 let block_number = extract_block_number(&log)?;
528 blocks_processed += block_number - last_block_saved;
529 last_block_saved = block_number;
530
531 let event_sig_bytes = extract_event_signature_bytes(&log)?;
532 if event_sig_bytes == swap_sig_bytes.as_slice() {
533 let swap_event = dex_extended.parse_swap_event_hypersync(log)?;
534 match self.process_pool_swap_event(&swap_event, &pool) {
535 Ok(swap) => swap_batch.push(swap),
536 Err(e) => log::error!("Failed to process swap event: {e}"),
537 }
538 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539 let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
540 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542 Err(e) => log::error!("Failed to process mint event: {e}"),
543 }
544 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545 let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
546 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548 Err(e) => log::error!("Failed to process burn event: {e}"),
549 }
550 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551 let collect_event = dex_extended.parse_collect_event_hypersync(log)?;
552 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553 Ok(fee_collect) => collect_batch.push(fee_collect),
554 Err(e) => log::error!("Failed to process collect event: {e}"),
555 }
556 } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557 let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
558 self.cache
559 .update_pool_initialize_price_tick(&initialize_event)
560 .await?;
561 } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562 if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
563 match parse_fn(dex_extended.dex.clone(), log) {
564 Ok(flash_event) => {
565 match self.process_pool_flash_event(&flash_event, &pool) {
566 Ok(flash) => flash_batch.push(flash),
567 Err(e) => log::error!("Failed to process flash event: {e}"),
568 }
569 }
570 Err(e) => log::error!("Failed to parse flash event: {e}"),
571 }
572 }
573 } else {
574 let event_signature = hex::encode(event_sig_bytes);
575 log::error!(
576 "Unexpected event signature: {event_signature} for log {log:?}"
577 );
578 }
579
580 if !beyond_stale_data
582 && last_block_across_pool_events_table
583 .is_some_and(|table_max| block_number > table_max)
584 {
585 log::info!(
586 "Crossed beyond stale data at block {block_number} - flushing current batches with ON CONFLICT, then switching to COPY"
587 );
588
589 self.flush_event_batches(
591 EVENT_BATCH_SIZE,
592 &mut swap_batch,
593 &mut liquidity_batch,
594 &mut collect_batch,
595 &mut flash_batch,
596 false,
597 true,
598 )
599 .await?;
600
601 beyond_stale_data = true;
602 log::info!("Switched to COPY mode - future batches will use COPY command");
603 } else {
604 self.flush_event_batches(
606 EVENT_BATCH_SIZE,
607 &mut swap_batch,
608 &mut liquidity_batch,
609 &mut collect_batch,
610 &mut flash_batch,
611 false, false,
613 )
614 .await?;
615 }
616
617 metrics.update(blocks_processed as usize);
618 blocks_processed = 0;
619
620 if metrics.should_log_progress(block_number, to_block) {
622 metrics.log_progress(block_number);
623 self.cache
624 .update_pool_last_synced_block(dex, &pool_identifier, block_number)
625 .await?;
626 }
627 }
628
629 self.flush_event_batches(
630 EVENT_BATCH_SIZE,
631 &mut swap_batch,
632 &mut liquidity_batch,
633 &mut collect_batch,
634 &mut flash_batch,
635 false,
636 true,
637 )
638 .await?;
639
640 metrics.log_final_stats();
641 self.cache
642 .update_pool_last_synced_block(dex, &pool_identifier, to_block)
643 .await?;
644
645 log::info!(
646 "Successfully synced Dex '{}' Pool '{}' up to block {}",
647 dex,
648 pool_display,
649 to_block.separate_with_commas()
650 );
651 Ok(())
652 } => result
653 };
654
655 sync_result
656 }
657
658 #[allow(clippy::too_many_arguments)]
659 async fn flush_event_batches(
660 &mut self,
661 event_batch_size: usize,
662 swap_batch: &mut Vec<PoolSwap>,
663 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
664 collect_batch: &mut Vec<PoolFeeCollect>,
665 flash_batch: &mut Vec<PoolFlash>,
666 use_copy_command: bool,
667 force_flush_all: bool,
668 ) -> anyhow::Result<()> {
669 if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
670 self.cache
671 .add_pool_swaps_batch(swap_batch, use_copy_command)
672 .await?;
673 swap_batch.clear();
674 }
675 if (force_flush_all || liquidity_batch.len() >= event_batch_size)
676 && !liquidity_batch.is_empty()
677 {
678 self.cache
679 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
680 .await?;
681 liquidity_batch.clear();
682 }
683 if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
684 {
685 self.cache
686 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
687 .await?;
688 collect_batch.clear();
689 }
690 if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
691 self.cache.add_pool_flash_batch(flash_batch).await?;
692 flash_batch.clear();
693 }
694 Ok(())
695 }
696
697 pub fn process_pool_swap_event(
707 &self,
708 swap_event: &SwapEvent,
709 pool: &SharedPool,
710 ) -> anyhow::Result<PoolSwap> {
711 let timestamp = self
712 .cache
713 .get_block_timestamp(swap_event.block_number)
714 .copied();
715 let mut swap = swap_event.to_pool_swap(
716 self.chain.clone(),
717 pool.instrument_id,
718 pool.pool_identifier,
719 timestamp,
720 );
721 swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
722
723 Ok(swap)
724 }
725
726 pub fn process_pool_mint_event(
732 &self,
733 mint_event: &MintEvent,
734 pool: &SharedPool,
735 dex_extended: &DexExtended,
736 ) -> anyhow::Result<PoolLiquidityUpdate> {
737 let timestamp = self
738 .cache
739 .get_block_timestamp(mint_event.block_number)
740 .copied();
741
742 let liquidity_update = mint_event.to_pool_liquidity_update(
743 self.chain.clone(),
744 dex_extended.dex.clone(),
745 pool.instrument_id,
746 timestamp,
747 );
748
749 Ok(liquidity_update)
752 }
753
754 pub fn process_pool_burn_event(
761 &self,
762 burn_event: &BurnEvent,
763 pool: &SharedPool,
764 dex_extended: &DexExtended,
765 ) -> anyhow::Result<PoolLiquidityUpdate> {
766 let timestamp = self
767 .cache
768 .get_block_timestamp(burn_event.block_number)
769 .copied();
770
771 let liquidity_update = burn_event.to_pool_liquidity_update(
772 self.chain.clone(),
773 dex_extended.dex.clone(),
774 pool.instrument_id,
775 pool.pool_identifier,
776 timestamp,
777 );
778
779 Ok(liquidity_update)
782 }
783
784 pub fn process_pool_collect_event(
790 &self,
791 collect_event: &CollectEvent,
792 pool: &SharedPool,
793 dex_extended: &DexExtended,
794 ) -> anyhow::Result<PoolFeeCollect> {
795 let timestamp = self
796 .cache
797 .get_block_timestamp(collect_event.block_number)
798 .copied();
799
800 let fee_collect = collect_event.to_pool_fee_collect(
801 self.chain.clone(),
802 dex_extended.dex.clone(),
803 pool.instrument_id,
804 timestamp,
805 );
806
807 Ok(fee_collect)
808 }
809
810 pub fn process_pool_flash_event(
816 &self,
817 flash_event: &FlashEvent,
818 pool: &SharedPool,
819 ) -> anyhow::Result<PoolFlash> {
820 let timestamp = self
821 .cache
822 .get_block_timestamp(flash_event.block_number)
823 .copied();
824
825 let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
826
827 Ok(flash)
828 }
829
830 pub async fn sync_exchange_pools(
841 &mut self,
842 dex: &DexType,
843 from_block: u64,
844 to_block: Option<u64>,
845 reset: bool,
846 ) -> anyhow::Result<()> {
847 let dex_extended = self.get_dex_extended(dex)?.clone();
848
849 let mut service = PoolDiscoveryService::new(
850 self.chain.clone(),
851 &mut self.cache,
852 &self.tokens,
853 &self.hypersync_client,
854 self.cancellation_token.clone(),
855 self.config.clone(),
856 );
857
858 service
859 .sync_pools(&dex_extended, from_block, to_block, reset)
860 .await?;
861
862 Ok(())
863 }
864
865 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
876 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
877 log::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
878
879 self.cache.add_dex(dex_extended.dex.clone()).await?;
880 let _ = self.cache.load_pools(&dex_id).await?;
881
882 self.subscription_manager.register_dex_for_subscriptions(
883 dex_id,
884 dex_extended.swap_created_event.as_ref(),
885 dex_extended.mint_created_event.as_ref(),
886 dex_extended.burn_created_event.as_ref(),
887 dex_extended.collect_created_event.as_ref(),
888 dex_extended.flash_created_event.as_deref(),
889 );
890 Ok(())
891 } else {
892 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
893 }
894 }
895
896 pub async fn bootstrap_latest_pool_profiler(
917 &mut self,
918 pool: &SharedPool,
919 ) -> anyhow::Result<(PoolProfiler, bool)> {
920 log::info!(
921 "Bootstrapping latest pool profiler for pool {}",
922 pool.address
923 );
924
925 if self.cache.database.is_none() {
926 anyhow::bail!(
927 "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
928 );
929 }
930
931 let mut profiler = PoolProfiler::new(pool.clone());
932
933 let from_position = match self
935 .cache
936 .database
937 .as_ref()
938 .unwrap()
939 .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
940 .await
941 {
942 Ok(Some(snapshot)) => {
943 log::info!(
944 "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
945 snapshot.block_position.number.separate_with_commas(),
946 snapshot.positions.len(),
947 snapshot.ticks.len()
948 );
949 let block_position = snapshot.block_position.clone();
950 profiler.restore_from_snapshot(snapshot)?;
951 log::info!("Restored profiler from snapshot");
952 Some(block_position)
953 }
954 _ => {
955 log::info!("No valid snapshot found, processing from beginning");
956 None
957 }
958 };
959
960 if self
964 .cache
965 .database
966 .as_ref()
967 .unwrap()
968 .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
969 .await?
970 .is_none()
971 {
972 return self
973 .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
974 .await;
975 }
976
977 if let Err(e) = self
979 .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
980 .await
981 {
982 log::error!("Failed to sync pool events for snapshot request: {e}");
983 }
984
985 if !profiler.is_initialized {
986 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
987 profiler.initialize(initial_sqrt_price_x96);
988 } else {
989 anyhow::bail!(
990 "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
991 );
992 }
993 }
994
995 let from_block = from_position
996 .as_ref()
997 .map_or(profiler.pool.creation_block, |block_position| {
998 block_position.number
999 });
1000 let to_block = self.hypersync_client.current_block().await;
1001 let total_blocks = to_block.saturating_sub(from_block) + 1;
1002
1003 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1005
1006 let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1007 pool.chain.clone(),
1008 pool.dex.clone(),
1009 pool.instrument_id,
1010 pool.pool_identifier,
1011 from_position.clone(),
1012 );
1013
1014 while let Some(result) = stream.next().await {
1015 match result {
1016 Ok(event) => {
1017 profiler.process(&event)?;
1018 }
1019 Err(e) => log::error!("Error processing event: {e}"),
1020 }
1021 }
1022
1023 profiler.finalize_reporting();
1024
1025 Ok((profiler, false))
1026 }
1027
1028 async fn construct_pool_profiler_from_hypersync_rpc(
1048 &self,
1049 mut profiler: PoolProfiler,
1050 from_position: Option<BlockPosition>,
1051 ) -> anyhow::Result<(PoolProfiler, bool)> {
1052 log::info!("Constructing pool profiler from hypersync stream and RPC final state querying");
1053 let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1054 let mint_event_signature = dex_extended.mint_created_event.as_ref();
1055 let burn_event_signature = dex_extended.burn_created_event.as_ref();
1056 let initialize_event_signature =
1057 if let Some(initialize_event) = &dex_extended.initialize_event {
1058 initialize_event.as_ref()
1059 } else {
1060 anyhow::bail!(
1061 "DEX {} does not have initialize event set.",
1062 &profiler.pool.dex.name
1063 );
1064 };
1065 let mint_sig_bytes = hex::decode(
1066 mint_event_signature
1067 .strip_prefix("0x")
1068 .unwrap_or(mint_event_signature),
1069 )?;
1070 let burn_sig_bytes = hex::decode(
1071 burn_event_signature
1072 .strip_prefix("0x")
1073 .unwrap_or(burn_event_signature),
1074 )?;
1075 let initialize_sig_bytes = hex::decode(
1076 initialize_event_signature
1077 .strip_prefix("0x")
1078 .unwrap_or(initialize_event_signature),
1079 )?;
1080
1081 let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1082 block_position.number
1083 });
1084 let to_block = self.hypersync_client.current_block().await;
1085 let total_blocks = to_block.saturating_sub(from_block) + 1;
1086
1087 log::info!(
1088 "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1089 profiler.pool.address,
1090 from_block.separate_with_commas(),
1091 to_block.separate_with_commas(),
1092 total_blocks.separate_with_commas()
1093 );
1094
1095 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1097
1098 let pool_events_stream = self
1099 .hypersync_client
1100 .request_contract_events_stream(
1101 from_block,
1102 None,
1103 &profiler.pool.address,
1104 vec![
1105 mint_event_signature,
1106 burn_event_signature,
1107 initialize_event_signature,
1108 ],
1109 )
1110 .await;
1111 tokio::pin!(pool_events_stream);
1112
1113 while let Some(log) = pool_events_stream.next().await {
1114 let event_sig_bytes = extract_event_signature_bytes(&log)?;
1115
1116 if event_sig_bytes == initialize_sig_bytes {
1117 let initialize_event = dex_extended.parse_initialize_event_hypersync(log)?;
1118 profiler.initialize(initialize_event.sqrt_price_x96);
1119 self.cache
1120 .database
1121 .as_ref()
1122 .unwrap()
1123 .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1124 .await?;
1125 } else if event_sig_bytes == mint_sig_bytes {
1126 let mint_event = dex_extended.parse_mint_event_hypersync(log)?;
1127 match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1128 Ok(liquidity_update) => {
1129 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1130 }
1131 Err(e) => log::error!("Failed to process mint event: {e}"),
1132 }
1133 } else if event_sig_bytes == burn_sig_bytes {
1134 let burn_event = dex_extended.parse_burn_event_hypersync(log)?;
1135 match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1136 Ok(liquidity_update) => {
1137 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1138 }
1139 Err(e) => log::error!("Failed to process burn event: {e}"),
1140 }
1141 } else {
1142 let event_signature = hex::encode(event_sig_bytes);
1143 log::error!(
1144 "Unexpected event signature in bootstrap_latest_pool_profiler: {event_signature} for log {log:?}"
1145 );
1146 }
1147 }
1148
1149 profiler.finalize_reporting();
1150
1151 match self.get_on_chain_snapshot(&profiler).await {
1153 Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1154 Err(e) => log::error!(
1155 "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1156 ),
1157 }
1158
1159 Ok((profiler, true))
1160 }
1161
1162 pub async fn check_snapshot_validity(
1177 &self,
1178 profiler: &PoolProfiler,
1179 already_validated: bool,
1180 ) -> anyhow::Result<bool> {
1181 let (is_valid, block_position) = if already_validated {
1183 log::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1185 let last_event = profiler
1186 .last_processed_event
1187 .clone()
1188 .expect("Profiler should have last_processed_event");
1189 (true, last_event)
1190 } else {
1191 match self.get_on_chain_snapshot(profiler).await {
1193 Ok(on_chain_snapshot) => {
1194 log::info!("Comparing profiler state with on-chain state...");
1195 let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1196 if !valid {
1197 log::error!(
1198 "Pool profiler state does NOT match on-chain smart contract state"
1199 );
1200 }
1201 (valid, on_chain_snapshot.block_position)
1202 }
1203 Err(e) => {
1204 log::error!("Failed to check snapshot validity: {e}");
1205 return Ok(false);
1206 }
1207 }
1208 };
1209
1210 if is_valid && let Some(cache_database) = &self.cache.database {
1212 cache_database
1213 .mark_pool_snapshot_valid(
1214 profiler.pool.chain.chain_id,
1215 &profiler.pool.pool_identifier,
1216 block_position.number,
1217 block_position.transaction_index,
1218 block_position.log_index,
1219 )
1220 .await?;
1221 log::info!("Marked pool profiler snapshot as valid");
1222 }
1223
1224 Ok(is_valid)
1225 }
1226
1227 async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1233 if profiler.pool.dex.name == DexType::UniswapV3 {
1234 let last_processed_event = profiler
1235 .last_processed_event
1236 .clone()
1237 .expect("We expect at least one processed event in the pool");
1238 let on_chain_snapshot = self
1239 .univ3_pool
1240 .fetch_snapshot(
1241 &profiler.pool.address,
1242 profiler.pool.instrument_id,
1243 profiler.get_active_tick_values().as_slice(),
1244 &profiler.get_all_position_keys(),
1245 last_processed_event,
1246 )
1247 .await?;
1248
1249 Ok(on_chain_snapshot)
1250 } else {
1251 anyhow::bail!(
1252 "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1253 profiler.pool.dex.name
1254 )
1255 }
1256 }
1257
1258 pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1267 if let Some(database) = &self.cache.database {
1268 log::info!(
1269 "Replaying historical events for pool {} to hydrate profiler",
1270 pool.instrument_id
1271 );
1272
1273 let mut event_stream = database.stream_pool_events(
1274 self.chain.clone(),
1275 dex.clone(),
1276 pool.instrument_id,
1277 pool.pool_identifier,
1278 None,
1279 );
1280 let mut event_count = 0;
1281
1282 while let Some(event_result) = event_stream.next().await {
1283 match event_result {
1284 Ok(event) => {
1285 let data_event = match event {
1286 DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1287 DexPoolData::LiquidityUpdate(update) => {
1288 DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1289 }
1290 DexPoolData::FeeCollect(collect) => {
1291 DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1292 }
1293 DexPoolData::Flash(flash) => {
1294 DataEvent::DeFi(DefiData::PoolFlash(flash))
1295 }
1296 };
1297 self.send_data(data_event);
1298 event_count += 1;
1299 }
1300 Err(e) => {
1301 log::error!("Error streaming event for pool {}: {e}", pool.instrument_id);
1302 }
1303 }
1304 }
1305
1306 log::info!(
1307 "Replayed {event_count} historical events for pool {}",
1308 pool.instrument_id
1309 );
1310 } else {
1311 log::debug!(
1312 "No database available, skipping event replay for pool {}",
1313 pool.instrument_id
1314 );
1315 }
1316
1317 Ok(())
1318 }
1319
1320 fn determine_from_block(&self) -> u64 {
1322 self.config
1323 .from_block
1324 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1325 }
1326
1327 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1329 if !self.cache.get_registered_dexes().contains(dex_id) {
1330 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1331 }
1332
1333 match get_dex_extended(self.chain.name, dex_id) {
1334 Some(dex) => Ok(dex),
1335 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1336 }
1337 }
1338
1339 pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1345 match self.cache.get_pool(pool_identifier) {
1346 Some(pool) => Ok(pool),
1347 None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1348 }
1349 }
1350
1351 pub fn send_data(&self, data: DataEvent) {
1353 if let Some(data_tx) = &self.data_tx {
1354 log::debug!("Sending {data}");
1355
1356 if let Err(e) = data_tx.send(data) {
1357 log::error!("Failed to send data: {e}");
1358 }
1359 } else {
1360 log::error!("No data event channel for sending data");
1361 }
1362 }
1363
1364 pub async fn disconnect(&mut self) {
1369 self.hypersync_client.disconnect().await;
1370 }
1371}