nautilus_blockchain/hypersync/
client.rs1use std::sync::Arc;
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22 net_types::{BlockField, BlockSelection, FieldSelection, Query},
23 simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27 defi::{Block, DexType, SharedChain},
28 identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33 exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34 rpc::types::BlockchainMessage,
35};
36
37const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
43
44const DISCONNECT_TIMEOUT_SECS: u64 = 5;
47
48#[derive(Debug)]
50pub struct HyperSyncClient {
51 chain: SharedChain,
53 client: Arc<hypersync_client::Client>,
55 blocks_task: Option<tokio::task::JoinHandle<()>>,
57 blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
59 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
61 pool_addresses: AHashMap<InstrumentId, Address>,
63 cancellation_token: tokio_util::sync::CancellationToken,
65}
66
67impl HyperSyncClient {
68 #[must_use]
77 pub fn new(
78 chain: SharedChain,
79 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
80 cancellation_token: tokio_util::sync::CancellationToken,
81 ) -> Self {
82 let mut config = hypersync_client::ClientConfig::default();
83 let hypersync_url =
84 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
85 config.url = hypersync_url.to_string();
86 config.api_token = std::env::var("ENVIO_API_TOKEN")
87 .expect("ENVIO_API_TOKEN environment variable must be set");
88 let client = hypersync_client::Client::new(config)
89 .expect("Failed to create HyperSync client - check ENVIO_API_TOKEN is a valid UUID");
90
91 Self {
92 chain,
93 client: Arc::new(client),
94 blocks_task: None,
95 blocks_cancellation_token: None,
96 tx,
97 pool_addresses: AHashMap::new(),
98 cancellation_token,
99 }
100 }
101
102 #[must_use]
103 pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
104 self.pool_addresses.get(&instrument_id)
105 }
106
107 pub fn process_block_dex_contract_events(
113 &mut self,
114 dex: &DexType,
115 block: u64,
116 contract_addresses: Vec<Address>,
117 swap_event_encoded_signature: String,
118 mint_event_encoded_signature: String,
119 burn_event_encoded_signature: String,
120 ) {
121 let topics = vec![
122 swap_event_encoded_signature.as_str(),
123 &mint_event_encoded_signature.as_str(),
124 &burn_event_encoded_signature.as_str(),
125 ];
126 let query = Self::construct_contract_events_query(
127 block,
128 Some(block + 1),
129 contract_addresses,
130 topics,
131 );
132 let tx = if let Some(tx) = &self.tx {
133 tx.clone()
134 } else {
135 tracing::error!("Hypersync client channel should have been initialized");
136 return;
137 };
138 let client = self.client.clone();
139 let dex_extended =
140 get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
141 let cancellation_token = self.cancellation_token.clone();
142
143 let _task = get_runtime().spawn(async move {
144 let mut rx = match client.stream(query, Default::default()).await {
145 Ok(rx) => rx,
146 Err(e) => {
147 tracing::error!("Failed to create DEX event stream: {e}");
148 return;
149 }
150 };
151
152 loop {
153 tokio::select! {
154 () = cancellation_token.cancelled() => {
155 tracing::debug!("DEX event processing task received cancellation signal");
156 break;
157 }
158 response = rx.recv() => {
159 let Some(response) = response else {
160 break;
161 };
162
163 let response = match response {
164 Ok(resp) => resp,
165 Err(e) => {
166 tracing::error!("Failed to receive DEX event stream response: {e}");
167 break;
168 }
169 };
170
171 for batch in response.data.logs {
172 for log in batch {
173 let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
174 Some(log_argument) => {
175 format!("0x{}", hex::encode(log_argument.as_ref()))
176 }
177 None => continue,
178 };
179 if event_signature == swap_event_encoded_signature {
180 match dex_extended.parse_swap_event(log.clone()) {
181 Ok(swap_event) => {
182 if let Err(e) =
183 tx.send(BlockchainMessage::SwapEvent(swap_event))
184 {
185 tracing::error!("Failed to send swap event: {e}");
186 }
187 }
188 Err(e) => {
189 tracing::error!(
190 "Failed to parse swap with error '{e:?}' for event: {log:?}",
191 );
192 continue;
193 }
194 }
195 } else if event_signature == mint_event_encoded_signature {
196 match dex_extended.parse_mint_event(log.clone()) {
197 Ok(swap_event) => {
198 if let Err(e) =
199 tx.send(BlockchainMessage::MintEvent(swap_event))
200 {
201 tracing::error!("Failed to send mint event: {e}");
202 }
203 }
204 Err(e) => {
205 tracing::error!(
206 "Failed to parse mint with error '{e:?}' for event: {log:?}",
207 );
208 continue;
209 }
210 }
211 } else if event_signature == burn_event_encoded_signature {
212 match dex_extended.parse_burn_event(log.clone()) {
213 Ok(swap_event) => {
214 if let Err(e) =
215 tx.send(BlockchainMessage::BurnEvent(swap_event))
216 {
217 tracing::error!("Failed to send burn event: {e}");
218 }
219 }
220 Err(e) => {
221 tracing::error!(
222 "Failed to parse burn with error '{e:?}' for event: {log:?}",
223 );
224 continue;
225 }
226 }
227 } else {
228 tracing::error!("Unknown event signature: {event_signature}");
229 continue;
230 }
231 }
232 }
233 }
234 }
235 }
236 });
237
238 }
241
242 pub async fn request_contract_events_stream(
248 &self,
249 from_block: u64,
250 to_block: Option<u64>,
251 contract_address: &Address,
252 topics: Vec<&str>,
253 ) -> impl Stream<Item = Log> + use<> {
254 let query = Self::construct_contract_events_query(
255 from_block,
256 to_block,
257 vec![*contract_address],
258 topics,
259 );
260
261 let mut rx = self
262 .client
263 .clone()
264 .stream(query, Default::default())
265 .await
266 .expect("Failed to create stream");
267
268 async_stream::stream! {
269 while let Some(response) = rx.recv().await {
270 let response = response.unwrap();
271
272 for batch in response.data.logs {
273 for log in batch {
274 yield log
275 }
276 }
277 }
278 }
279 }
280
281 pub async fn disconnect(&mut self) {
283 tracing::debug!("Disconnecting HyperSync client");
284 self.cancellation_token.cancel();
285
286 if let Some(mut task) = self.blocks_task.take() {
288 match tokio::time::timeout(
289 std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
290 &mut task,
291 )
292 .await
293 {
294 Ok(Ok(())) => {
295 tracing::debug!("Blocks task completed gracefully");
296 }
297 Ok(Err(e)) => {
298 tracing::error!("Error awaiting blocks task: {e}");
299 }
300 Err(_) => {
301 tracing::warn!(
302 "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
303 aborting task (this is expected if Hypersync long-poll was in progress)"
304 );
305 task.abort();
306 let _ = task.await;
307 }
308 }
309 }
310
311 tracing::debug!("HyperSync client disconnected");
314 }
315
316 pub async fn current_block(&self) -> u64 {
322 self.client.get_height().await.unwrap()
323 }
324
325 pub async fn request_blocks_stream(
331 &self,
332 from_block: u64,
333 to_block: Option<u64>,
334 ) -> impl Stream<Item = Block> {
335 let query = Self::construct_block_query(from_block, to_block);
336 let mut rx = self
337 .client
338 .clone()
339 .stream(query, Default::default())
340 .await
341 .unwrap();
342
343 let chain = self.chain.name;
344
345 async_stream::stream! {
346 while let Some(response) = rx.recv().await {
347 let response = response.unwrap();
348 for batch in response.data.blocks {
349 for received_block in batch {
350 let block = transform_hypersync_block(chain, received_block).unwrap();
351 yield block
352 }
353 }
354 }
355 }
356 }
357
358 pub fn subscribe_blocks(&mut self) {
364 if self.blocks_task.is_some() {
365 return;
366 }
367
368 let chain = self.chain.name;
369 let client = self.client.clone();
370 let tx = if let Some(tx) = &self.tx {
371 tx.clone()
372 } else {
373 tracing::error!("Hypersync client channel should have been initialized");
374 return;
375 };
376
377 let blocks_token = self.cancellation_token.child_token();
379 let cancellation_token = blocks_token.clone();
380 self.blocks_cancellation_token = Some(blocks_token);
381
382 let task = get_runtime().spawn(async move {
383 tracing::debug!("Starting task 'blocks_feed");
384
385 let current_block_height = client.get_height().await.unwrap();
386 let mut query = Self::construct_block_query(current_block_height, None);
387
388 loop {
389 tokio::select! {
390 () = cancellation_token.cancelled() => {
391 tracing::debug!("Blocks subscription task received cancellation signal");
392 break;
393 }
394 result = tokio::time::timeout(
395 std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
396 client.get(&query)
397 ) => {
398 let response = match result {
399 Ok(Ok(resp)) => resp,
400 Ok(Err(e)) => {
401 tracing::error!("Hypersync request failed: {e}");
402 break;
403 }
404 Err(_) => {
405 tracing::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
406 continue;
407 }
408 };
409
410 for batch in response.data.blocks {
411 for received_block in batch {
412 let block = transform_hypersync_block(chain, received_block).unwrap();
413 let msg = BlockchainMessage::Block(block);
414 if let Err(e) = tx.send(msg) {
415 log::error!("Error sending message: {e}");
416 }
417 }
418 }
419
420 if let Some(archive_block_height) = response.archive_height
421 && archive_block_height < response.next_block
422 {
423 while client.get_height().await.unwrap() < response.next_block {
424 tokio::select! {
425 () = cancellation_token.cancelled() => {
426 tracing::debug!("Blocks subscription task received cancellation signal during polling");
427 return;
428 }
429 () = tokio::time::sleep(std::time::Duration::from_millis(
430 BLOCK_POLLING_INTERVAL_MS,
431 )) => {}
432 }
433 }
434 }
435
436 query.from_block = response.next_block;
437 }
438 }
439 }
440 });
441
442 self.blocks_task = Some(task);
443 }
444
445 fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
447 Query {
448 from_block,
449 to_block,
450 blocks: vec![BlockSelection::default()],
451 field_selection: FieldSelection {
452 block: BlockField::all(),
453 ..Default::default()
454 },
455 ..Default::default()
456 }
457 }
458
459 fn construct_contract_events_query(
460 from_block: u64,
461 to_block: Option<u64>,
462 contract_addresses: Vec<Address>,
463 topics: Vec<&str>,
464 ) -> Query {
465 let mut query_value = serde_json::json!({
466 "from_block": from_block,
467 "logs": [{
468 "topics": [topics],
469 "address": contract_addresses
470 }],
471 "field_selection": {
472 "log": [
473 "block_number",
474 "transaction_hash",
475 "transaction_index",
476 "log_index",
477 "address",
478 "data",
479 "topic0",
480 "topic1",
481 "topic2",
482 "topic3",
483 ]
484 }
485 });
486
487 if let Some(to_block) = to_block
488 && let Some(obj) = query_value.as_object_mut()
489 {
490 obj.insert("to_block".to_string(), serde_json::json!(to_block));
491 }
492
493 serde_json::from_value(query_value).unwrap()
494 }
495
496 pub async fn unsubscribe_blocks(&mut self) {
498 if let Some(task) = self.blocks_task.take() {
499 if let Some(token) = self.blocks_cancellation_token.take() {
501 token.cancel();
502 }
503 if let Err(e) = task.await {
504 tracing::error!("Error awaiting blocks task during unsubscribe: {e}");
505 }
506 tracing::debug!("Unsubscribed from blocks");
507 }
508 }
509}