1use std::{cmp::max, collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_common::messages::DataEvent;
21use nautilus_core::UnixNanos;
22use nautilus_model::defi::{
23 Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolProfiler, PoolSwap, SharedChain,
24 SharedDex, SharedPool, Token,
25 data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
26 pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
27 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
28};
29use thousands::Separable;
30
31use crate::{
32 cache::BlockchainCache,
33 config::BlockchainDataClientConfig,
34 contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
35 data::subscription::DefiDataSubscriptionManager,
36 events::{
37 burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent,
38 pool_created::PoolCreatedEvent, swap::SwapEvent,
39 },
40 exchanges::{extended::DexExtended, get_dex_extended},
41 hypersync::{
42 client::HyperSyncClient,
43 helpers::{extract_block_number, extract_event_signature_bytes},
44 },
45 rpc::{
46 BlockchainRpcClient, BlockchainRpcClientAny,
47 chains::{
48 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
49 polygon::PolygonRpcClient,
50 },
51 http::BlockchainHttpRpcClient,
52 types::BlockchainMessage,
53 },
54};
55
56const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
57
58#[derive(Debug)]
63pub struct BlockchainDataClientCore {
64 pub chain: SharedChain,
66 pub config: BlockchainDataClientConfig,
68 pub cache: BlockchainCache,
70 tokens: Erc20Contract,
72 univ3_pool: UniswapV3PoolContract,
74 pub hypersync_client: HyperSyncClient,
76 pub rpc_client: Option<BlockchainRpcClientAny>,
78 pub subscription_manager: DefiDataSubscriptionManager,
80 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
82 cancellation_token: tokio_util::sync::CancellationToken,
84}
85
86impl BlockchainDataClientCore {
87 #[must_use]
93 pub fn new(
94 config: BlockchainDataClientConfig,
95 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
96 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
97 cancellation_token: tokio_util::sync::CancellationToken,
98 ) -> Self {
99 let chain = config.chain.clone();
100 let cache = BlockchainCache::new(chain.clone());
101
102 tracing::info!(
104 "Initializing blockchain data client for '{}' with HTTP RPC: {}",
105 chain.name,
106 config.http_rpc_url
107 );
108
109 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
110 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
111 tracing::info!("WebSocket RPC URL: {}", wss_rpc_url);
112 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
113 } else {
114 tracing::info!("Using HyperSync for live data (no WebSocket RPC)");
115 None
116 };
117 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
118 config.http_rpc_url.clone(),
119 config.rpc_requests_per_second,
120 ));
121 let erc20_contract = Erc20Contract::new(
122 http_rpc_client.clone(),
123 config.pool_filters.remove_pools_with_empty_erc20fields,
124 );
125
126 let hypersync_client =
127 HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
128 Self {
129 chain,
130 config,
131 rpc_client,
132 tokens: erc20_contract,
133 univ3_pool: UniswapV3PoolContract::new(http_rpc_client.clone()),
134 cache,
135 hypersync_client,
136 subscription_manager: DefiDataSubscriptionManager::new(),
137 data_tx,
138 cancellation_token,
139 }
140 }
141
142 pub async fn initialize_cache_database(&mut self) {
144 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
145 tracing::info!(
146 "Initializing blockchain cache on database '{}'",
147 pg_connect_options.database
148 );
149 self.cache
150 .initialize_database(pg_connect_options.clone().into())
151 .await;
152 }
153 }
154
155 fn initialize_rpc_client(
157 blockchain: Blockchain,
158 wss_rpc_url: String,
159 ) -> BlockchainRpcClientAny {
160 match blockchain {
161 Blockchain::Ethereum => {
162 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
163 }
164 Blockchain::Polygon => {
165 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
166 }
167 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
168 Blockchain::Arbitrum => {
169 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
170 }
171 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
172 }
173 }
174
175 pub async fn connect(&mut self) -> anyhow::Result<()> {
181 tracing::info!(
182 "Connecting blockchain data client for '{}'",
183 self.chain.name
184 );
185 self.initialize_cache_database().await;
186
187 if let Some(ref mut rpc_client) = self.rpc_client {
188 rpc_client.connect().await?;
189 }
190
191 let from_block = self.determine_from_block();
192
193 tracing::info!(
194 "Connecting to blockchain data source for '{}' from block {}",
195 self.chain.name,
196 from_block.separate_with_commas()
197 );
198
199 self.cache.initialize_chain().await;
201 self.cache.connect(from_block).await?;
203 for dex in self.config.dex_ids.clone() {
207 self.register_dex_exchange(dex).await?;
208 self.sync_exchange_pools(&dex, from_block, None, false)
209 .await?;
210 }
211
212 Ok(())
213 }
214
215 pub async fn sync_blocks_checked(
221 &mut self,
222 from_block: u64,
223 to_block: Option<u64>,
224 ) -> anyhow::Result<()> {
225 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
226 if blocks_status.is_consistent() {
228 tracing::info!(
229 "Cache is consistent: no gaps detected (last continuous block: {})",
230 blocks_status.last_continuous_block
231 );
232 let target_block = max(blocks_status.max_block + 1, from_block);
233 tracing::info!(
234 "Starting fast sync with COPY from block {}",
235 target_block.separate_with_commas()
236 );
237 self.sync_blocks(target_block, to_block, true).await?;
238 } else {
239 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
240 tracing::info!(
241 "Cache inconsistency detected: {} blocks missing between {} and {}",
242 gap_size,
243 blocks_status.last_continuous_block + 1,
244 blocks_status.max_block
245 );
246
247 tracing::info!(
248 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
249 blocks_status.last_continuous_block + 1,
250 blocks_status.max_block
251 );
252 self.sync_blocks(
253 blocks_status.last_continuous_block + 1,
254 Some(blocks_status.max_block),
255 false,
256 )
257 .await?;
258
259 tracing::info!(
260 "Block syncing Phase 2: Continuing with fast COPY from block {}",
261 (blocks_status.max_block + 1).separate_with_commas()
262 );
263 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
264 .await?;
265 }
266 } else {
267 self.sync_blocks(from_block, to_block, true).await?;
268 }
269
270 Ok(())
271 }
272
273 pub async fn sync_blocks(
279 &mut self,
280 from_block: u64,
281 to_block: Option<u64>,
282 use_copy_command: bool,
283 ) -> anyhow::Result<()> {
284 let to_block = if let Some(block) = to_block {
285 block
286 } else {
287 self.hypersync_client.current_block().await
288 };
289 let total_blocks = to_block.saturating_sub(from_block) + 1;
290 tracing::info!(
291 "Syncing blocks from {} to {} (total: {} blocks)",
292 from_block.separate_with_commas(),
293 to_block.separate_with_commas(),
294 total_blocks.separate_with_commas()
295 );
296
297 if let Err(e) = self.cache.toggle_performance_settings(true).await {
299 tracing::warn!("Failed to enable performance settings: {e}");
300 }
301
302 let blocks_stream = self
303 .hypersync_client
304 .request_blocks_stream(from_block, Some(to_block))
305 .await;
306
307 tokio::pin!(blocks_stream);
308
309 let mut metrics = BlockchainSyncReporter::new(
310 BlockchainSyncReportItems::Blocks,
311 from_block,
312 total_blocks,
313 BLOCKS_PROCESS_IN_SYNC_REPORT,
314 );
315
316 const BATCH_SIZE: usize = 1000;
318 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
319
320 let cancellation_token = self.cancellation_token.clone();
321 let sync_result = tokio::select! {
322 () = cancellation_token.cancelled() => {
323 tracing::info!("Block sync cancelled");
324 Err(anyhow::anyhow!("Sync cancelled"))
325 }
326 result = async {
327 while let Some(block) = blocks_stream.next().await {
328 let block_number = block.number;
329 if self.cache.get_block_timestamp(block_number).is_some() {
330 continue;
331 }
332 batch.push(block);
333
334 if batch.len() >= BATCH_SIZE || block_number >= to_block {
336 let batch_size = batch.len();
337
338 self.cache.add_blocks_batch(batch, use_copy_command).await?;
339 metrics.update(batch_size);
340
341 batch = Vec::with_capacity(BATCH_SIZE);
343 }
344
345 if metrics.should_log_progress(block_number, to_block) {
347 metrics.log_progress(block_number);
348 }
349 }
350
351 if !batch.is_empty() {
353 let batch_size = batch.len();
354 self.cache.add_blocks_batch(batch, use_copy_command).await?;
355 metrics.update(batch_size);
356 }
357
358 metrics.log_final_stats();
359 Ok(())
360 } => result
361 };
362
363 sync_result?;
364
365 if let Err(e) = self.cache.toggle_performance_settings(false).await {
367 tracing::warn!("Failed to restore default settings: {e}");
368 }
369
370 Ok(())
371 }
372
373 pub async fn sync_pool_events(
379 &mut self,
380 dex: &DexType,
381 pool_address: &Address,
382 from_block: Option<u64>,
383 to_block: Option<u64>,
384 reset: bool,
385 ) -> anyhow::Result<()> {
386 let pool: SharedPool = self.get_pool(&pool_address)?.clone();
387 let pool_display = pool.to_full_spec_string();
388 let from_block = from_block.unwrap_or(pool.creation_block);
389
390 let (last_synced_block, effective_from_block) = if reset {
391 (None, from_block)
392 } else {
393 let last_synced_block = self
394 .cache
395 .get_pool_last_synced_block(dex, &pool_address)
396 .await?;
397 let effective_from_block = last_synced_block
398 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
399 (last_synced_block, effective_from_block)
400 };
401
402 let to_block = match to_block {
403 Some(block) => block,
404 None => self.hypersync_client.current_block().await,
405 };
406
407 if effective_from_block > to_block {
409 tracing::info!(
410 "D {} already synced to block {} (current: {}), skipping sync",
411 dex,
412 last_synced_block.unwrap_or(0).separate_with_commas(),
413 to_block.separate_with_commas()
414 );
415 return Ok(());
416 }
417
418 let last_block_across_pool_events_table = self
420 .cache
421 .get_pool_event_tables_last_block(&pool_address)
422 .await?;
423
424 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
425 tracing::info!(
426 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
427 pool_display,
428 effective_from_block.separate_with_commas(),
429 to_block.separate_with_commas(),
430 total_blocks.separate_with_commas(),
431 if let Some(last_synced) = last_synced_block {
432 format!(
433 " - resuming from last synced block {}",
434 last_synced.separate_with_commas()
435 )
436 } else {
437 String::new()
438 }
439 );
440
441 let mut metrics = BlockchainSyncReporter::new(
442 BlockchainSyncReportItems::PoolEvents,
443 effective_from_block,
444 total_blocks,
445 BLOCKS_PROCESS_IN_SYNC_REPORT,
446 );
447 let dex_extended = self.get_dex_extended(dex)?.clone();
448 let swap_event_signature = dex_extended.swap_created_event.as_ref();
449 let mint_event_signature = dex_extended.mint_created_event.as_ref();
450 let burn_event_signature = dex_extended.burn_created_event.as_ref();
451 let collect_event_signature = dex_extended.collect_created_event.as_ref();
452 let flash_event_signature = dex_extended.flash_created_event.as_ref();
453 let initialize_event_signature: Option<&str> =
454 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
455
456 let swap_sig_bytes = hex::decode(
458 swap_event_signature
459 .strip_prefix("0x")
460 .unwrap_or(swap_event_signature),
461 )?;
462 let mint_sig_bytes = hex::decode(
463 mint_event_signature
464 .strip_prefix("0x")
465 .unwrap_or(mint_event_signature),
466 )?;
467 let burn_sig_bytes = hex::decode(
468 burn_event_signature
469 .strip_prefix("0x")
470 .unwrap_or(burn_event_signature),
471 )?;
472 let collect_sig_bytes = hex::decode(
473 collect_event_signature
474 .strip_prefix("0x")
475 .unwrap_or(collect_event_signature),
476 )?;
477 let flash_sig_bytes = flash_event_signature
478 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
479 let initialize_sig_bytes = initialize_event_signature
480 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
481
482 let mut event_signatures = vec![
483 swap_event_signature,
484 mint_event_signature,
485 burn_event_signature,
486 collect_event_signature,
487 ];
488 if let Some(event) = dex_extended.initialize_event.as_ref() {
489 event_signatures.push(event);
490 }
491 if let Some(event) = dex_extended.flash_created_event.as_ref() {
492 event_signatures.push(event);
493 }
494 let pool_events_stream = self
495 .hypersync_client
496 .request_contract_events_stream(
497 effective_from_block,
498 Some(to_block),
499 &pool_address,
500 event_signatures,
501 )
502 .await;
503 tokio::pin!(pool_events_stream);
504
505 let mut last_block_saved = effective_from_block;
506 let mut blocks_processed = 0;
507
508 const EVENT_BATCH_SIZE: usize = 20000;
510 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
511 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
512 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
513 let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
514
515 let mut beyond_stale_data = last_block_across_pool_events_table
517 .map_or(true, |tables_max| effective_from_block > tables_max);
518
519 let cancellation_token = self.cancellation_token.clone();
520 let sync_result = tokio::select! {
521 () = cancellation_token.cancelled() => {
522 tracing::info!("Pool event sync cancelled");
523 Err(anyhow::anyhow!("Sync cancelled"))
524 }
525 result = async {
526 while let Some(log) = pool_events_stream.next().await {
527 let block_number = extract_block_number(&log)?;
528 blocks_processed += block_number - last_block_saved;
529 last_block_saved = block_number;
530
531 let event_sig_bytes = extract_event_signature_bytes(&log)?;
532 if event_sig_bytes == swap_sig_bytes.as_slice() {
533 let swap_event = dex_extended.parse_swap_event(log)?;
534 match self.process_pool_swap_event(&swap_event, &pool, &dex_extended) {
535 Ok(swap) => swap_batch.push(swap),
536 Err(e) => tracing::error!("Failed to process swap event: {e}"),
537 }
538 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
539 let mint_event = dex_extended.parse_mint_event(log)?;
540 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
541 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
542 Err(e) => tracing::error!("Failed to process mint event: {e}"),
543 }
544 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
545 let burn_event = dex_extended.parse_burn_event(log)?;
546 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
547 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
548 Err(e) => tracing::error!("Failed to process burn event: {e}"),
549 }
550 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
551 let collect_event = dex_extended.parse_collect_event(log)?;
552 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
553 Ok(fee_collect) => collect_batch.push(fee_collect),
554 Err(e) => tracing::error!("Failed to process collect event: {e}"),
555 }
556 } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
557 let initialize_event = dex_extended.parse_initialize_event(log)?;
558 self.cache
559 .update_pool_initialize_price_tick(&initialize_event)
560 .await?;
561 } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
562 if let Some(parse_fn) = dex_extended.parse_flash_event_fn {
563 match parse_fn(dex_extended.dex.clone(), log) {
564 Ok(flash_event) => {
565 match self.process_pool_flash_event(&flash_event, &pool) {
566 Ok(flash) => flash_batch.push(flash),
567 Err(e) => tracing::error!("Failed to process flash event: {e}"),
568 }
569 }
570 Err(e) => tracing::error!("Failed to parse flash event: {e}"),
571 }
572 }
573 } else {
574 let event_signature = hex::encode(event_sig_bytes);
575 tracing::error!(
576 "Unexpected event signature: {} for log {:?}",
577 event_signature,
578 log
579 );
580 }
581
582 if !beyond_stale_data
584 && last_block_across_pool_events_table
585 .map_or(false, |table_max| block_number > table_max)
586 {
587 tracing::info!(
588 "Crossed beyond stale data at block {} - flushing current batches with ON CONFLICT, then switching to COPY",
589 block_number
590 );
591
592 self.flush_event_batches(
594 EVENT_BATCH_SIZE,
595 &mut swap_batch,
596 &mut liquidity_batch,
597 &mut collect_batch,
598 &mut flash_batch,
599 false,
600 true,
601 )
602 .await?;
603
604 beyond_stale_data = true;
605 tracing::info!("Switched to COPY mode - future batches will use COPY command");
606 } else {
607 self.flush_event_batches(
609 EVENT_BATCH_SIZE,
610 &mut swap_batch,
611 &mut liquidity_batch,
612 &mut collect_batch,
613 &mut flash_batch,
614 false, false,
616 )
617 .await?;
618 }
619
620 metrics.update(blocks_processed as usize);
621 blocks_processed = 0;
622
623 if metrics.should_log_progress(block_number, to_block) {
625 metrics.log_progress(block_number);
626 self.cache
627 .update_pool_last_synced_block(dex, &pool_address, block_number)
628 .await?;
629 }
630 }
631
632 self.flush_event_batches(
633 EVENT_BATCH_SIZE,
634 &mut swap_batch,
635 &mut liquidity_batch,
636 &mut collect_batch,
637 &mut flash_batch,
638 false,
639 true,
640 )
641 .await?;
642
643 metrics.log_final_stats();
644 self.cache
645 .update_pool_last_synced_block(dex, &pool_address, to_block)
646 .await?;
647
648 tracing::info!(
649 "Successfully synced Dex '{}' Pool '{}' up to block {}",
650 dex,
651 pool_display,
652 to_block.separate_with_commas()
653 );
654 Ok(())
655 } => result
656 };
657
658 sync_result
659 }
660
661 async fn flush_event_batches(
662 &mut self,
663 event_batch_size: usize,
664 swap_batch: &mut Vec<PoolSwap>,
665 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
666 collect_batch: &mut Vec<PoolFeeCollect>,
667 flash_batch: &mut Vec<PoolFlash>,
668 use_copy_command: bool,
669 force_flush_all: bool,
670 ) -> anyhow::Result<()> {
671 if force_flush_all || swap_batch.len() >= event_batch_size {
672 if !swap_batch.is_empty() {
673 self.cache
674 .add_pool_swaps_batch(swap_batch, use_copy_command)
675 .await?;
676 swap_batch.clear();
677 }
678 }
679 if force_flush_all || liquidity_batch.len() >= event_batch_size {
680 if !liquidity_batch.is_empty() {
681 self.cache
682 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
683 .await?;
684 liquidity_batch.clear();
685 }
686 }
687 if force_flush_all || collect_batch.len() >= event_batch_size {
688 if !collect_batch.is_empty() {
689 self.cache
690 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
691 .await?;
692 collect_batch.clear();
693 }
694 }
695 if force_flush_all || flash_batch.len() >= event_batch_size {
696 if !flash_batch.is_empty() {
697 self.cache.add_pool_flash_batch(flash_batch).await?;
698 flash_batch.clear();
699 }
700 }
701 Ok(())
702 }
703
704 pub fn process_pool_swap_event(
710 &self,
711 swap_event: &SwapEvent,
712 pool: &SharedPool,
713 dex_extended: &DexExtended,
714 ) -> anyhow::Result<PoolSwap> {
715 let timestamp = self
716 .cache
717 .get_block_timestamp(swap_event.block_number)
718 .copied();
719 let (side, size, price) =
720 dex_extended.convert_to_trade_data(&pool.token0, &pool.token1, swap_event)?;
721 let swap = swap_event.to_pool_swap(
722 self.chain.clone(),
723 pool.instrument_id,
724 pool.address,
725 Some(side),
726 Some(size),
727 Some(price),
728 timestamp,
729 );
730
731 Ok(swap)
735 }
736
737 pub fn process_pool_mint_event(
743 &self,
744 mint_event: &MintEvent,
745 pool: &SharedPool,
746 dex_extended: &DexExtended,
747 ) -> anyhow::Result<PoolLiquidityUpdate> {
748 let timestamp = self
749 .cache
750 .get_block_timestamp(mint_event.block_number)
751 .copied();
752
753 let liquidity_update = mint_event.to_pool_liquidity_update(
754 self.chain.clone(),
755 dex_extended.dex.clone(),
756 pool.instrument_id,
757 pool.address,
758 timestamp,
759 );
760
761 Ok(liquidity_update)
764 }
765
766 pub fn process_pool_burn_event(
773 &self,
774 burn_event: &BurnEvent,
775 pool: &SharedPool,
776 dex_extended: &DexExtended,
777 ) -> anyhow::Result<PoolLiquidityUpdate> {
778 let timestamp = self
779 .cache
780 .get_block_timestamp(burn_event.block_number)
781 .copied();
782
783 let liquidity_update = burn_event.to_pool_liquidity_update(
784 self.chain.clone(),
785 dex_extended.dex.clone(),
786 pool.instrument_id,
787 pool.address,
788 timestamp,
789 );
790
791 Ok(liquidity_update)
794 }
795
796 pub fn process_pool_collect_event(
802 &self,
803 collect_event: &CollectEvent,
804 pool: &SharedPool,
805 dex_extended: &DexExtended,
806 ) -> anyhow::Result<PoolFeeCollect> {
807 let timestamp = self
808 .cache
809 .get_block_timestamp(collect_event.block_number)
810 .copied();
811
812 let fee_collect = collect_event.to_pool_fee_collect(
813 self.chain.clone(),
814 dex_extended.dex.clone(),
815 pool.instrument_id,
816 pool.address,
817 timestamp,
818 );
819
820 Ok(fee_collect)
821 }
822
823 pub fn process_pool_flash_event(
829 &self,
830 flash_event: &FlashEvent,
831 pool: &SharedPool,
832 ) -> anyhow::Result<PoolFlash> {
833 let timestamp = self
834 .cache
835 .get_block_timestamp(flash_event.block_number)
836 .copied();
837
838 let flash = flash_event.to_pool_flash(
839 self.chain.clone(),
840 pool.instrument_id,
841 pool.address,
842 timestamp,
843 );
844
845 Ok(flash)
846 }
847
848 pub async fn sync_exchange_pools(
859 &mut self,
860 dex: &DexType,
861 from_block: u64,
862 to_block: Option<u64>,
863 reset: bool,
864 ) -> anyhow::Result<()> {
865 let (last_synced_block, effective_from_block) = if reset {
867 (None, from_block)
868 } else {
869 let last_synced_block = self.cache.get_dex_last_synced_block(dex).await?;
870 let effective_from_block = last_synced_block
871 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
872 (last_synced_block, effective_from_block)
873 };
874
875 let to_block = match to_block {
876 Some(block) => block,
877 None => self.hypersync_client.current_block().await,
878 };
879
880 if effective_from_block > to_block {
882 tracing::info!(
883 "DEX {} already synced to block {} (current: {}), skipping sync",
884 dex,
885 last_synced_block.unwrap_or(0).separate_with_commas(),
886 to_block.separate_with_commas()
887 );
888 return Ok(());
889 }
890
891 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
892 tracing::info!(
893 "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
894 effective_from_block.separate_with_commas(),
895 to_block.separate_with_commas(),
896 total_blocks.separate_with_commas(),
897 if let Some(last_synced) = last_synced_block {
898 format!(
899 " - resuming from last synced block {}",
900 last_synced.separate_with_commas()
901 )
902 } else {
903 String::new()
904 }
905 );
906
907 if let Err(e) = self.cache.toggle_performance_settings(true).await {
909 tracing::warn!("Failed to enable performance settings: {e}");
910 }
911
912 let mut metrics = BlockchainSyncReporter::new(
913 BlockchainSyncReportItems::PoolCreatedEvents,
914 effective_from_block,
915 total_blocks,
916 BLOCKS_PROCESS_IN_SYNC_REPORT,
917 );
918
919 let dex = self.get_dex_extended(dex)?.clone();
920 let factory_address = &dex.factory;
921 let pair_created_event_signature = dex.pool_created_event.as_ref();
922 let pools_stream = self
923 .hypersync_client
924 .request_contract_events_stream(
925 effective_from_block,
926 Some(to_block),
927 factory_address,
928 vec![pair_created_event_signature],
929 )
930 .await;
931
932 tokio::pin!(pools_stream);
933
934 let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
936 let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
937
938 const POOL_DB_BATCH_SIZE: usize = 2000;
940 let mut token_db_buffer: Vec<Token> = Vec::new();
941 let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
942
943 let mut last_block_saved = effective_from_block;
944
945 let cancellation_token = self.cancellation_token.clone();
946 let sync_result = tokio::select! {
947 () = cancellation_token.cancelled() => {
948 tracing::info!("Exchange pool sync cancelled");
949 Err(anyhow::anyhow!("Sync cancelled"))
950 }
951 result = async {
952 while let Some(log) = pools_stream.next().await {
953 let block_number = extract_block_number(&log)?;
954 let blocks_progress = block_number - last_block_saved;
955 last_block_saved = block_number;
956
957 let pool = dex.parse_pool_created_event(log)?;
958 if self.cache.get_pool(&pool.pool_address).is_some() {
959 continue;
961 }
962
963 if self.cache.is_invalid_token(&pool.token0)
964 || self.cache.is_invalid_token(&pool.token1)
965 {
966 continue;
968 }
969
970 if self.cache.get_token(&pool.token0).is_none() {
972 token_rpc_buffer.insert(pool.token0);
973 }
974 if self.cache.get_token(&pool.token1).is_none() {
975 token_rpc_buffer.insert(pool.token1);
976 }
977
978 pool_events_buffer.push(pool);
980
981 if token_rpc_buffer.len() >= token_rpc_batch_size {
983 let fetched_tokens = self
984 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
985 .await?;
986
987 token_db_buffer.extend(fetched_tokens);
989 }
990
991 if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
994 if !token_rpc_buffer.is_empty() {
996 let fetched_tokens = self
997 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
998 .await?;
999 token_db_buffer.extend(fetched_tokens);
1000 }
1001
1002 if !token_db_buffer.is_empty() {
1004 self.cache
1005 .add_tokens_batch(token_db_buffer.drain(..).collect())
1006 .await?;
1007 }
1008
1009 let pools = self
1011 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1012 .await?;
1013 self.cache.add_pools_batch(pools).await?;
1014 }
1015
1016 metrics.update(blocks_progress as usize);
1017 if metrics.should_log_progress(block_number, to_block) {
1019 metrics.log_progress(block_number);
1020 }
1021 }
1022
1023 if !token_rpc_buffer.is_empty() {
1026 let fetched_tokens = self
1027 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
1028 .await?;
1029 token_db_buffer.extend(fetched_tokens);
1030 }
1031
1032 if !token_db_buffer.is_empty() {
1034 self.cache
1035 .add_tokens_batch(token_db_buffer.drain(..).collect())
1036 .await?;
1037 }
1038
1039 if !pool_events_buffer.is_empty() {
1041 let pools = self
1042 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
1043 .await?;
1044 self.cache.add_pools_batch(pools).await?;
1045 }
1046
1047 metrics.log_final_stats();
1048
1049 self.cache
1051 .update_dex_last_synced_block(&dex.dex.name, to_block)
1052 .await?;
1053
1054 tracing::info!(
1055 "Successfully synced DEX {} pools up to block {}",
1056 dex.dex.name,
1057 to_block.separate_with_commas()
1058 );
1059
1060 Ok(())
1061 } => result
1062 };
1063
1064 sync_result?;
1065
1066 if let Err(e) = self.cache.toggle_performance_settings(false).await {
1068 tracing::warn!("Failed to restore default settings: {e}");
1069 }
1070
1071 Ok(())
1072 }
1073
1074 async fn fetch_and_cache_tokens_in_memory(
1083 &mut self,
1084 token_buffer: &mut HashSet<Address>,
1085 ) -> anyhow::Result<Vec<Token>> {
1086 let batch_addresses: Vec<Address> = token_buffer.drain().collect();
1087 let token_infos = self.tokens.batch_fetch_token_info(&batch_addresses).await?;
1088
1089 let mut valid_tokens = Vec::new();
1090
1091 for (token_address, token_info) in token_infos {
1092 match token_info {
1093 Ok(token_info) => {
1094 let token = Token::new(
1095 self.chain.clone(),
1096 token_address,
1097 token_info.name,
1098 token_info.symbol,
1099 token_info.decimals,
1100 );
1101
1102 self.cache.insert_token_in_memory(token.clone());
1104
1105 valid_tokens.push(token);
1107 }
1108 Err(token_info_error) => {
1109 self.cache.insert_invalid_token_in_memory(token_address);
1110 if let Some(database) = &self.cache.database {
1111 database
1112 .add_invalid_token(
1113 self.chain.chain_id,
1114 &token_address,
1115 &token_info_error.to_string(),
1116 )
1117 .await?;
1118 }
1119 }
1120 }
1121 }
1122
1123 Ok(valid_tokens)
1124 }
1125
1126 async fn construct_pools_batch(
1135 &mut self,
1136 pool_events: &mut Vec<PoolCreatedEvent>,
1137 dex: &SharedDex,
1138 ) -> anyhow::Result<Vec<Pool>> {
1139 let mut pools = Vec::with_capacity(pool_events.len());
1140
1141 for pool_event in pool_events.drain(..) {
1142 let token0 = match self.cache.get_token(&pool_event.token0) {
1144 Some(token) => token.clone(),
1145 None => {
1146 if !self.cache.is_invalid_token(&pool_event.token0) {
1147 tracing::warn!(
1148 "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
1149 pool_event.pool_address,
1150 pool_event.token0
1151 );
1152 }
1153 continue;
1154 }
1155 };
1156
1157 let token1 = match self.cache.get_token(&pool_event.token1) {
1158 Some(token) => token.clone(),
1159 None => {
1160 if !self.cache.is_invalid_token(&pool_event.token1) {
1161 tracing::warn!(
1162 "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
1163 pool_event.pool_address,
1164 pool_event.token1
1165 );
1166 }
1167 continue;
1168 }
1169 };
1170
1171 let pool = Pool::new(
1172 self.chain.clone(),
1173 dex.clone(),
1174 pool_event.pool_address,
1175 pool_event.block_number,
1176 token0,
1177 token1,
1178 pool_event.fee,
1179 pool_event.tick_spacing,
1180 UnixNanos::default(),
1181 );
1182
1183 pools.push(pool);
1184 }
1185
1186 Ok(pools)
1187 }
1188
1189 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
1200 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
1201 tracing::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
1202
1203 self.cache.add_dex(dex_extended.dex.clone()).await?;
1204 let _ = self.cache.load_pools(&dex_id).await?;
1205
1206 self.subscription_manager.register_dex_for_subscriptions(
1207 dex_id,
1208 dex_extended.swap_created_event.as_ref(),
1209 dex_extended.mint_created_event.as_ref(),
1210 dex_extended.burn_created_event.as_ref(),
1211 dex_extended.collect_created_event.as_ref(),
1212 dex_extended.flash_created_event.as_deref(),
1213 );
1214 Ok(())
1215 } else {
1216 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
1217 }
1218 }
1219
1220 pub async fn bootstrap_latest_pool_profiler(
1237 &mut self,
1238 pool: &SharedPool,
1239 ) -> anyhow::Result<(PoolProfiler, bool)> {
1240 tracing::info!(
1241 "Bootstrapping latest pool profiler for pool {}",
1242 pool.address
1243 );
1244
1245 if self.cache.database.is_none() {
1246 anyhow::bail!(
1247 "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
1248 );
1249 }
1250
1251 let mut profiler = PoolProfiler::new(pool.clone());
1252
1253 let from_position = match self
1255 .cache
1256 .database
1257 .as_ref()
1258 .unwrap()
1259 .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.address)
1260 .await
1261 {
1262 Ok(Some(snapshot)) => {
1263 tracing::info!(
1264 "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
1265 snapshot.block_position.number.separate_with_commas(),
1266 snapshot.positions.len(),
1267 snapshot.ticks.len()
1268 );
1269 let block_position = snapshot.block_position.clone();
1270 profiler.restore_from_snapshot(snapshot)?;
1271 tracing::info!("Restored profiler from snapshot");
1272 Some(block_position)
1273 }
1274 _ => {
1275 tracing::info!("No valid snapshot found, processing from beginning");
1276 None
1277 }
1278 };
1279
1280 if self
1284 .cache
1285 .database
1286 .as_ref()
1287 .unwrap()
1288 .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.address)
1289 .await?
1290 .is_none()
1291 {
1292 return self
1293 .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
1294 .await;
1295 }
1296
1297 if let Err(e) = self
1299 .sync_pool_events(&pool.dex.name, &pool.address, None, None, false)
1300 .await
1301 {
1302 tracing::error!("Failed to sync pool events for snapshot request: {}", e);
1303 }
1304
1305 if !profiler.is_initialized {
1306 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
1307 profiler.initialize(initial_sqrt_price_x96);
1308 } else {
1309 anyhow::bail!(
1310 "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
1311 );
1312 }
1313 }
1314
1315 let from_block = from_position
1316 .as_ref()
1317 .map(|block_position| block_position.number)
1318 .unwrap_or(profiler.pool.creation_block);
1319 let to_block = self.hypersync_client.current_block().await;
1320 let total_blocks = to_block.saturating_sub(from_block) + 1;
1321
1322 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1324
1325 let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1326 pool.chain.clone(),
1327 pool.dex.clone(),
1328 pool.instrument_id,
1329 &pool.address,
1330 from_position.clone(),
1331 );
1332
1333 while let Some(result) = stream.next().await {
1334 match result {
1335 Ok(event) => {
1336 profiler.process(&event)?;
1337 }
1338 Err(e) => log::error!("Error processing event: {}", e),
1339 }
1340 }
1341
1342 profiler.finalize_reporting();
1343
1344 Ok((profiler, false))
1345 }
1346
1347 async fn construct_pool_profiler_from_hypersync_rpc(
1367 &self,
1368 mut profiler: PoolProfiler,
1369 from_position: Option<BlockPosition>,
1370 ) -> anyhow::Result<(PoolProfiler, bool)> {
1371 tracing::info!(
1372 "Constructing pool profiler from hypersync stream and RPC final state querying"
1373 );
1374 let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1375 let mint_event_signature = dex_extended.mint_created_event.as_ref();
1376 let burn_event_signature = dex_extended.burn_created_event.as_ref();
1377 let initialize_event_signature =
1378 if let Some(initialize_event) = &dex_extended.initialize_event {
1379 initialize_event.as_ref()
1380 } else {
1381 anyhow::bail!(
1382 "DEX {} does not have initialize event set.",
1383 &profiler.pool.dex.name
1384 );
1385 };
1386 let mint_sig_bytes = hex::decode(
1387 mint_event_signature
1388 .strip_prefix("0x")
1389 .unwrap_or(mint_event_signature),
1390 )?;
1391 let burn_sig_bytes = hex::decode(
1392 burn_event_signature
1393 .strip_prefix("0x")
1394 .unwrap_or(burn_event_signature),
1395 )?;
1396 let initialize_sig_bytes = hex::decode(
1397 initialize_event_signature
1398 .strip_prefix("0x")
1399 .unwrap_or(initialize_event_signature),
1400 )?;
1401
1402 let from_block = from_position
1403 .map(|block_position| block_position.number)
1404 .unwrap_or(profiler.pool.creation_block);
1405 let to_block = self.hypersync_client.current_block().await;
1406 let total_blocks = to_block.saturating_sub(from_block) + 1;
1407
1408 tracing::info!(
1409 "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1410 profiler.pool.address,
1411 from_block.separate_with_commas(),
1412 to_block.separate_with_commas(),
1413 total_blocks.separate_with_commas()
1414 );
1415
1416 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1418
1419 let pool_events_stream = self
1420 .hypersync_client
1421 .request_contract_events_stream(
1422 from_block,
1423 None,
1424 &profiler.pool.address,
1425 vec![
1426 mint_event_signature,
1427 burn_event_signature,
1428 initialize_event_signature,
1429 ],
1430 )
1431 .await;
1432 tokio::pin!(pool_events_stream);
1433
1434 while let Some(log) = pool_events_stream.next().await {
1435 let event_sig_bytes = extract_event_signature_bytes(&log)?;
1436
1437 if event_sig_bytes == initialize_sig_bytes {
1438 let initialize_event = dex_extended.parse_initialize_event(log)?;
1439 profiler.initialize(initialize_event.sqrt_price_x96);
1440 self.cache
1441 .database
1442 .as_ref()
1443 .unwrap()
1444 .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1445 .await?;
1446 } else if event_sig_bytes == mint_sig_bytes {
1447 let mint_event = dex_extended.parse_mint_event(log)?;
1448 match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1449 Ok(liquidity_update) => {
1450 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1451 }
1452 Err(e) => tracing::error!("Failed to process mint event: {e}"),
1453 }
1454 } else if event_sig_bytes == burn_sig_bytes {
1455 let burn_event = dex_extended.parse_burn_event(log)?;
1456 match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1457 Ok(liquidity_update) => {
1458 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1459 }
1460 Err(e) => tracing::error!("Failed to process burn event: {e}"),
1461 }
1462 } else {
1463 let event_signature = hex::encode(event_sig_bytes);
1464 tracing::error!(
1465 "Unexpected event signature in bootstrap_latest_pool_profiler: {} for log {:?}",
1466 event_signature,
1467 log
1468 );
1469 }
1470 }
1471
1472 profiler.finalize_reporting();
1473
1474 match self.get_on_chain_snapshot(&profiler).await {
1476 Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1477 Err(e) => tracing::error!(
1478 "Failed to restore from on-chain snapshot: {e}. Sending not hydrated state to client."
1479 ),
1480 }
1481
1482 Ok((profiler, true))
1483 }
1484
1485 pub async fn check_snapshot_validity(
1496 &self,
1497 profiler: &PoolProfiler,
1498 already_validated: bool,
1499 ) -> anyhow::Result<bool> {
1500 let (is_valid, block_position) = if already_validated {
1502 tracing::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1504 let last_event = profiler
1505 .last_processed_event
1506 .clone()
1507 .expect("Profiler should have last_processed_event");
1508 (true, last_event)
1509 } else {
1510 match self.get_on_chain_snapshot(profiler).await {
1512 Ok(on_chain_snapshot) => {
1513 tracing::info!("Comparing profiler state with on-chain state...");
1514 let valid = compare_pool_profiler(&profiler, &on_chain_snapshot);
1515 if !valid {
1516 tracing::error!(
1517 "Pool profiler state does NOT match on-chain smart contract state"
1518 );
1519 }
1520 (valid, on_chain_snapshot.block_position)
1521 }
1522 Err(e) => {
1523 tracing::error!("Failed to check snapshot validity: {e}");
1524 return Ok(false);
1525 }
1526 }
1527 };
1528
1529 if is_valid {
1531 if let Some(cache_database) = &self.cache.database {
1532 cache_database
1533 .mark_pool_snapshot_valid(
1534 profiler.pool.chain.chain_id,
1535 &profiler.pool.address,
1536 block_position.number,
1537 block_position.transaction_index,
1538 block_position.log_index,
1539 )
1540 .await?;
1541 tracing::info!("Marked pool profiler snapshot as valid");
1542 }
1543 }
1544
1545 Ok(is_valid)
1546 }
1547
1548 async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1554 if profiler.pool.dex.name == DexType::UniswapV3 {
1555 let last_processed_event = profiler
1556 .last_processed_event
1557 .clone()
1558 .expect("We expect at least one processed event in the pool");
1559 let on_chain_snapshot = self
1560 .univ3_pool
1561 .fetch_snapshot(
1562 &profiler.pool.address,
1563 profiler.pool.instrument_id,
1564 profiler.get_active_tick_values().as_slice(),
1565 &profiler.get_all_position_keys(),
1566 last_processed_event,
1567 )
1568 .await?;
1569
1570 Ok(on_chain_snapshot)
1571 } else {
1572 anyhow::bail!(
1573 "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1574 profiler.pool.dex.name
1575 )
1576 }
1577 }
1578
1579 pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1588 if let Some(database) = &self.cache.database {
1589 tracing::info!(
1590 "Replaying historical events for pool {} to hydrate profiler",
1591 pool.instrument_id
1592 );
1593
1594 let mut event_stream = database.stream_pool_events(
1595 self.chain.clone(),
1596 dex.clone(),
1597 pool.instrument_id,
1598 &pool.address,
1599 None,
1600 );
1601 let mut event_count = 0;
1602
1603 while let Some(event_result) = event_stream.next().await {
1604 match event_result {
1605 Ok(event) => {
1606 let data_event = match event {
1607 DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1608 DexPoolData::LiquidityUpdate(update) => {
1609 DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1610 }
1611 DexPoolData::FeeCollect(collect) => {
1612 DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1613 }
1614 DexPoolData::Flash(flash) => {
1615 DataEvent::DeFi(DefiData::PoolFlash(flash))
1616 }
1617 };
1618 self.send_data(data_event);
1619 event_count += 1;
1620 }
1621 Err(e) => {
1622 tracing::error!(
1623 "Error streaming event for pool {}: {e}",
1624 pool.instrument_id
1625 );
1626 }
1627 }
1628 }
1629
1630 tracing::info!(
1631 "Replayed {event_count} historical events for pool {}",
1632 pool.instrument_id
1633 );
1634 } else {
1635 tracing::debug!(
1636 "No database available, skipping event replay for pool {}",
1637 pool.instrument_id
1638 );
1639 }
1640
1641 Ok(())
1642 }
1643
1644 fn determine_from_block(&self) -> u64 {
1646 self.config
1647 .from_block
1648 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1649 }
1650
1651 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1653 if !self.cache.get_registered_dexes().contains(dex_id) {
1654 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1655 }
1656
1657 match get_dex_extended(self.chain.name, dex_id) {
1658 Some(dex) => Ok(dex),
1659 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1660 }
1661 }
1662
1663 pub fn get_pool(&self, pool_address: &Address) -> anyhow::Result<&SharedPool> {
1669 match self.cache.get_pool(pool_address) {
1670 Some(pool) => Ok(pool),
1671 None => anyhow::bail!("Pool {pool_address} is not registered"),
1672 }
1673 }
1674
1675 pub fn send_data(&self, data: DataEvent) {
1677 if let Some(data_tx) = &self.data_tx {
1678 tracing::debug!("Sending {data}");
1679
1680 if let Err(e) = data_tx.send(data) {
1681 tracing::error!("Failed to send data: {e}");
1682 }
1683 } else {
1684 tracing::error!("No data event channel for sending data");
1685 }
1686 }
1687
1688 pub async fn disconnect(&mut self) {
1693 self.hypersync_client.disconnect().await;
1694 }
1695}