nautilus_blockchain/services/
pool_discovery.rs1use std::{cmp::max, collections::HashSet};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_model::defi::{
21 SharedDex,
22 amm::Pool,
23 chain::SharedChain,
24 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
25 token::Token,
26};
27use thousands::Separable;
28use tokio_util::sync::CancellationToken;
29
30use crate::{
31 cache::BlockchainCache,
32 config::BlockchainDataClientConfig,
33 contracts::erc20::Erc20Contract,
34 events::pool_created::PoolCreatedEvent,
35 exchanges::extended::DexExtended,
36 hypersync::{client::HyperSyncClient, helpers::extract_block_number},
37};
38
39const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
40const POOL_DB_BATCH_SIZE: usize = 2000;
41
42fn sanitize_string(s: String) -> String {
48 s.chars()
49 .filter(|c| {
50 *c != '\0' && (*c >= ' ' || *c == '\t' || *c == '\n' || *c == '\r')
53 })
54 .collect()
55}
56
57#[derive(Debug)]
62pub struct PoolDiscoveryService<'a> {
63 chain: SharedChain,
65 cache: &'a mut BlockchainCache,
67 erc20_contract: &'a Erc20Contract,
69 hypersync_client: &'a HyperSyncClient,
71 cancellation_token: CancellationToken,
73 config: BlockchainDataClientConfig,
75}
76
77impl<'a> PoolDiscoveryService<'a> {
78 #[must_use]
80 pub const fn new(
81 chain: SharedChain,
82 cache: &'a mut BlockchainCache,
83 erc20_contract: &'a Erc20Contract,
84 hypersync_client: &'a HyperSyncClient,
85 cancellation_token: CancellationToken,
86 config: BlockchainDataClientConfig,
87 ) -> Self {
88 Self {
89 chain,
90 cache,
91 erc20_contract,
92 hypersync_client,
93 cancellation_token,
94 config,
95 }
96 }
97
98 pub async fn sync_pools(
108 &mut self,
109 dex: &DexExtended,
110 from_block: u64,
111 to_block: Option<u64>,
112 reset: bool,
113 ) -> anyhow::Result<()> {
114 let (last_synced_block, effective_from_block) = if reset {
116 (None, from_block)
117 } else {
118 let last_synced_block = self.cache.get_dex_last_synced_block(&dex.dex.name).await?;
119 let effective_from_block = last_synced_block
120 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
121 (last_synced_block, effective_from_block)
122 };
123
124 let to_block = match to_block {
125 Some(block) => block,
126 None => self.hypersync_client.current_block().await,
127 };
128
129 if effective_from_block > to_block {
131 tracing::info!(
132 "DEX {} already synced to block {} (current: {}), skipping sync",
133 dex.dex.name,
134 last_synced_block.unwrap_or(0).separate_with_commas(),
135 to_block.separate_with_commas()
136 );
137 return Ok(());
138 }
139
140 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
141 tracing::info!(
142 "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
143 effective_from_block.separate_with_commas(),
144 to_block.separate_with_commas(),
145 total_blocks.separate_with_commas(),
146 if let Some(last_synced) = last_synced_block {
147 format!(
148 " - resuming from last synced block {}",
149 last_synced.separate_with_commas()
150 )
151 } else {
152 String::new()
153 },
154 );
155 tracing::info!(
156 "Syncing {} pool creation events from factory contract {} on chain {}",
157 dex.dex.name,
158 dex.factory,
159 self.chain.name
160 );
161
162 if let Err(e) = self.cache.toggle_performance_settings(true).await {
164 tracing::warn!("Failed to enable performance settings: {e}");
165 }
166
167 let mut metrics = BlockchainSyncReporter::new(
168 BlockchainSyncReportItems::PoolCreatedEvents,
169 effective_from_block,
170 total_blocks,
171 BLOCKS_PROCESS_IN_SYNC_REPORT,
172 );
173
174 let factory_address = &dex.factory;
175 let pair_created_event_signature = dex.pool_created_event.as_ref();
176 let pools_stream = self
177 .hypersync_client
178 .request_contract_events_stream(
179 effective_from_block,
180 Some(to_block),
181 factory_address,
182 vec![pair_created_event_signature],
183 )
184 .await;
185
186 tokio::pin!(pools_stream);
187
188 let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
190 let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
191
192 let mut token_db_buffer: Vec<Token> = Vec::new();
194 let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
195
196 let mut last_block_saved = effective_from_block;
197
198 let mut total_discovered = 0;
200 let mut total_skipped_exists = 0;
201 let mut total_skipped_invalid_tokens = 0;
202 let mut total_saved = 0;
203
204 let cancellation_token = self.cancellation_token.clone();
205 let sync_result = tokio::select! {
206 () = cancellation_token.cancelled() => {
207 tracing::info!("Exchange pool sync cancelled");
208 Err(anyhow::anyhow!("Sync cancelled"))
209 }
210
211 result = async {
212 while let Some(log) = pools_stream.next().await {
213 let block_number = extract_block_number(&log)?;
214 let blocks_progress = block_number - last_block_saved;
215 last_block_saved = block_number;
216
217 let pool = dex.parse_pool_created_event_hypersync(log)?;
218 total_discovered += 1;
219
220 if self.cache.get_pool(&pool.pool_identifier).is_some() {
221 total_skipped_exists += 1;
223 continue;
224 }
225
226 if self.cache.is_invalid_token(&pool.token0)
227 || self.cache.is_invalid_token(&pool.token1)
228 {
229 total_skipped_invalid_tokens += 1;
231 continue;
232 }
233
234 if self.cache.get_token(&pool.token0).is_none() {
236 token_rpc_buffer.insert(pool.token0);
237 }
238 if self.cache.get_token(&pool.token1).is_none() {
239 token_rpc_buffer.insert(pool.token1);
240 }
241
242 pool_events_buffer.push(pool);
244
245 if token_rpc_buffer.len() >= token_rpc_batch_size {
247 let fetched_tokens = self
248 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
249 .await?;
250
251 token_db_buffer.extend(fetched_tokens);
253 }
254
255 if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
258 if !token_rpc_buffer.is_empty() {
260 let fetched_tokens = self
261 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
262 .await?;
263 token_db_buffer.extend(fetched_tokens);
264 }
265
266 if !token_db_buffer.is_empty() {
268 self.cache
269 .add_tokens_batch(std::mem::take(&mut token_db_buffer))
270 .await?;
271 }
272
273 let pools = self
275 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
276 .await?;
277 total_saved += pools.len();
278 self.cache.add_pools_batch(pools).await?;
279 }
280
281 metrics.update(blocks_progress as usize);
282 if metrics.should_log_progress(block_number, to_block) {
284 metrics.log_progress(block_number);
285 }
286 }
287
288 if !token_rpc_buffer.is_empty() {
291 let fetched_tokens = self
292 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
293 .await?;
294 token_db_buffer.extend(fetched_tokens);
295 }
296
297 if !token_db_buffer.is_empty() {
299 self.cache
300 .add_tokens_batch(std::mem::take(&mut token_db_buffer))
301 .await?;
302 }
303
304 if !pool_events_buffer.is_empty() {
306 let pools = self
307 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
308 .await?;
309 total_saved += pools.len();
310 self.cache.add_pools_batch(pools).await?;
311 }
312
313 metrics.log_final_stats();
314
315 self.cache
317 .update_dex_last_synced_block(&dex.dex.name, to_block)
318 .await?;
319
320 tracing::info!(
321 "Successfully synced DEX {} pools up to block {} | Summary: discovered={}, saved={}, skipped_exists={}, skipped_invalid_tokens={}",
322 dex.dex.name,
323 to_block.separate_with_commas(),
324 total_discovered,
325 total_saved,
326 total_skipped_exists,
327 total_skipped_invalid_tokens
328 );
329
330 Ok(())
331 } => result
332 };
333
334 sync_result?;
335
336 if let Err(e) = self.cache.toggle_performance_settings(false).await {
338 tracing::warn!("Failed to restore default settings: {e}");
339 }
340
341 Ok(())
342 }
343
344 async fn fetch_and_cache_tokens_in_memory(
353 &mut self,
354 token_buffer: &mut HashSet<Address>,
355 ) -> anyhow::Result<Vec<Token>> {
356 let batch_addresses: Vec<Address> = token_buffer.drain().collect();
357 let token_infos = self
358 .erc20_contract
359 .batch_fetch_token_info(&batch_addresses)
360 .await?;
361
362 let mut valid_tokens = Vec::new();
363
364 for (token_address, token_info) in token_infos {
365 match token_info {
366 Ok(token_info) => {
367 let sanitized_name = sanitize_string(token_info.name);
369 let sanitized_symbol = sanitize_string(token_info.symbol);
370
371 let token = Token::new(
372 self.chain.clone(),
373 token_address,
374 sanitized_name,
375 sanitized_symbol,
376 token_info.decimals,
377 );
378
379 self.cache.insert_token_in_memory(token.clone());
381
382 valid_tokens.push(token);
384 }
385 Err(token_info_error) => {
386 self.cache.insert_invalid_token_in_memory(token_address);
387 if let Some(database) = &self.cache.database {
388 let sanitized_error = sanitize_string(token_info_error.to_string());
389 database
390 .add_invalid_token(
391 self.chain.chain_id,
392 &token_address,
393 &sanitized_error,
394 )
395 .await?;
396 }
397 }
398 }
399 }
400
401 Ok(valid_tokens)
402 }
403
404 async fn construct_pools_batch(
413 &mut self,
414 pool_events: &mut Vec<PoolCreatedEvent>,
415 dex: &SharedDex,
416 ) -> anyhow::Result<Vec<Pool>> {
417 let mut pools = Vec::with_capacity(pool_events.len());
418
419 for pool_event in pool_events.drain(..) {
420 let token0 = match self.cache.get_token(&pool_event.token0) {
422 Some(token) => token.clone(),
423 None => {
424 if !self.cache.is_invalid_token(&pool_event.token0) {
425 tracing::warn!(
426 "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
427 pool_event.pool_address,
428 pool_event.token0
429 );
430 }
431 continue;
432 }
433 };
434
435 let token1 = match self.cache.get_token(&pool_event.token1) {
436 Some(token) => token.clone(),
437 None => {
438 if !self.cache.is_invalid_token(&pool_event.token1) {
439 tracing::warn!(
440 "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
441 pool_event.pool_address,
442 pool_event.token1
443 );
444 }
445 continue;
446 }
447 };
448
449 let mut pool = Pool::new(
450 self.chain.clone(),
451 dex.clone(),
452 pool_event.pool_address,
453 pool_event.pool_identifier,
454 pool_event.block_number,
455 token0,
456 token1,
457 pool_event.fee,
458 pool_event.tick_spacing,
459 nautilus_core::UnixNanos::default(),
460 );
461
462 if let Some(hooks) = pool_event.hooks {
464 pool.set_hooks(hooks);
465 }
466
467 if let (Some(sqrt_price_x96), Some(tick)) = (pool_event.sqrt_price_x96, pool_event.tick)
469 {
470 pool.initialize(sqrt_price_x96, tick);
471 }
472
473 pools.push(pool);
474 }
475
476 Ok(pools)
477 }
478}