nautilus_blockchain/hypersync/
client.rs1use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22 net_types::{BlockSelection, FieldSelection, Query},
23 simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27 defi::{Block, DexType, SharedChain},
28 identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33 exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34 rpc::types::BlockchainMessage,
35};
36
37const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
43
44const DISCONNECT_TIMEOUT_SECS: u64 = 5;
47
48#[derive(Debug)]
50pub struct HyperSyncClient {
51 chain: SharedChain,
53 client: Arc<hypersync_client::Client>,
55 blocks_task: Option<tokio::task::JoinHandle<()>>,
57 blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
59 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
61 pool_addresses: AHashMap<InstrumentId, Address>,
63 cancellation_token: tokio_util::sync::CancellationToken,
65}
66
67impl HyperSyncClient {
68 #[must_use]
74 pub fn new(
75 chain: SharedChain,
76 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
77 cancellation_token: tokio_util::sync::CancellationToken,
78 ) -> Self {
79 let mut config = hypersync_client::ClientConfig::default();
80 let hypersync_url =
81 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
82 config.url = Some(hypersync_url);
83 let client = hypersync_client::Client::new(config).unwrap();
84
85 Self {
86 chain,
87 client: Arc::new(client),
88 blocks_task: None,
89 blocks_cancellation_token: None,
90 tx,
91 pool_addresses: AHashMap::new(),
92 cancellation_token,
93 }
94 }
95
96 #[must_use]
97 pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
98 self.pool_addresses.get(&instrument_id)
99 }
100
101 pub fn process_block_dex_contract_events(
107 &mut self,
108 dex: &DexType,
109 block: u64,
110 contract_addresses: Vec<Address>,
111 swap_event_encoded_signature: String,
112 mint_event_encoded_signature: String,
113 burn_event_encoded_signature: String,
114 ) {
115 let topics = vec![
116 swap_event_encoded_signature.as_str(),
117 &mint_event_encoded_signature.as_str(),
118 &burn_event_encoded_signature.as_str(),
119 ];
120 let query = Self::construct_contract_events_query(
121 block,
122 Some(block + 1),
123 contract_addresses,
124 topics,
125 );
126 let tx = if let Some(tx) = &self.tx {
127 tx.clone()
128 } else {
129 tracing::error!("Hypersync client channel should have been initialized");
130 return;
131 };
132 let client = self.client.clone();
133 let dex_extended =
134 get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
135 let cancellation_token = self.cancellation_token.clone();
136
137 let _task = get_runtime().spawn(async move {
138 let mut rx = match client.stream(query, Default::default()).await {
139 Ok(rx) => rx,
140 Err(e) => {
141 tracing::error!("Failed to create DEX event stream: {e}");
142 return;
143 }
144 };
145
146 loop {
147 tokio::select! {
148 () = cancellation_token.cancelled() => {
149 tracing::debug!("DEX event processing task received cancellation signal");
150 break;
151 }
152 response = rx.recv() => {
153 let Some(response) = response else {
154 break;
155 };
156
157 let response = match response {
158 Ok(resp) => resp,
159 Err(e) => {
160 tracing::error!("Failed to receive DEX event stream response: {e}");
161 break;
162 }
163 };
164
165 for batch in response.data.logs {
166 for log in batch {
167 let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
168 Some(log_argument) => {
169 format!("0x{}", hex::encode(log_argument.as_ref()))
170 }
171 None => continue,
172 };
173 if event_signature == swap_event_encoded_signature {
174 match dex_extended.parse_swap_event(log.clone()) {
175 Ok(swap_event) => {
176 if let Err(e) =
177 tx.send(BlockchainMessage::SwapEvent(swap_event))
178 {
179 tracing::error!("Failed to send swap event: {e}");
180 }
181 }
182 Err(e) => {
183 tracing::error!(
184 "Failed to parse swap with error '{e:?}' for event: {log:?}",
185 );
186 continue;
187 }
188 }
189 } else if event_signature == mint_event_encoded_signature {
190 match dex_extended.parse_mint_event(log.clone()) {
191 Ok(swap_event) => {
192 if let Err(e) =
193 tx.send(BlockchainMessage::MintEvent(swap_event))
194 {
195 tracing::error!("Failed to send mint event: {e}");
196 }
197 }
198 Err(e) => {
199 tracing::error!(
200 "Failed to parse mint with error '{e:?}' for event: {log:?}",
201 );
202 continue;
203 }
204 }
205 } else if event_signature == burn_event_encoded_signature {
206 match dex_extended.parse_burn_event(log.clone()) {
207 Ok(swap_event) => {
208 if let Err(e) =
209 tx.send(BlockchainMessage::BurnEvent(swap_event))
210 {
211 tracing::error!("Failed to send burn event: {e}");
212 }
213 }
214 Err(e) => {
215 tracing::error!(
216 "Failed to parse burn with error '{e:?}' for event: {log:?}",
217 );
218 continue;
219 }
220 }
221 } else {
222 tracing::error!("Unknown event signature: {event_signature}");
223 continue;
224 }
225 }
226 }
227 }
228 }
229 }
230 });
231
232 }
235
236 pub async fn request_contract_events_stream(
242 &self,
243 from_block: u64,
244 to_block: Option<u64>,
245 contract_address: &Address,
246 topics: Vec<&str>,
247 ) -> impl Stream<Item = Log> + use<> {
248 let query = Self::construct_contract_events_query(
249 from_block,
250 to_block,
251 vec![contract_address.clone()],
252 topics,
253 );
254
255 let mut rx = self
256 .client
257 .clone()
258 .stream(query, Default::default())
259 .await
260 .expect("Failed to create stream");
261
262 async_stream::stream! {
263 while let Some(response) = rx.recv().await {
264 let response = response.unwrap();
265
266 for batch in response.data.logs {
267 for log in batch {
268 yield log
269 }
270 }
271 }
272 }
273 }
274
275 pub async fn disconnect(&mut self) {
277 tracing::debug!("Disconnecting HyperSync client");
278 self.cancellation_token.cancel();
279
280 if let Some(mut task) = self.blocks_task.take() {
282 match tokio::time::timeout(
283 std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
284 &mut task,
285 )
286 .await
287 {
288 Ok(Ok(())) => {
289 tracing::debug!("Blocks task completed gracefully");
290 }
291 Ok(Err(e)) => {
292 tracing::error!("Error awaiting blocks task: {e}");
293 }
294 Err(_) => {
295 tracing::warn!(
296 "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
297 aborting task (this is expected if Hypersync long-poll was in progress)"
298 );
299 task.abort();
300 let _ = task.await;
301 }
302 }
303 }
304
305 tracing::debug!("HyperSync client disconnected");
308 }
309
310 pub async fn current_block(&self) -> u64 {
316 self.client.get_height().await.unwrap()
317 }
318
319 pub async fn request_blocks_stream(
325 &self,
326 from_block: u64,
327 to_block: Option<u64>,
328 ) -> impl Stream<Item = Block> {
329 let query = Self::construct_block_query(from_block, to_block);
330 let mut rx = self
331 .client
332 .clone()
333 .stream(query, Default::default())
334 .await
335 .unwrap();
336
337 let chain = self.chain.name;
338
339 async_stream::stream! {
340 while let Some(response) = rx.recv().await {
341 let response = response.unwrap();
342 for batch in response.data.blocks {
343 for received_block in batch {
344 let block = transform_hypersync_block(chain, received_block).unwrap();
345 yield block
346 }
347 }
348 }
349 }
350 }
351
352 pub fn subscribe_blocks(&mut self) {
358 if self.blocks_task.is_some() {
359 return;
360 }
361
362 let chain = self.chain.name;
363 let client = self.client.clone();
364 let tx = if let Some(tx) = &self.tx {
365 tx.clone()
366 } else {
367 tracing::error!("Hypersync client channel should have been initialized");
368 return;
369 };
370
371 let blocks_token = self.cancellation_token.child_token();
373 let cancellation_token = blocks_token.clone();
374 self.blocks_cancellation_token = Some(blocks_token);
375
376 let task = get_runtime().spawn(async move {
377 tracing::debug!("Starting task 'blocks_feed");
378
379 let current_block_height = client.get_height().await.unwrap();
380 let mut query = Self::construct_block_query(current_block_height, None);
381
382 loop {
383 tokio::select! {
384 () = cancellation_token.cancelled() => {
385 tracing::debug!("Blocks subscription task received cancellation signal");
386 break;
387 }
388 result = tokio::time::timeout(
389 std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
390 client.get(&query)
391 ) => {
392 let response = match result {
393 Ok(Ok(resp)) => resp,
394 Ok(Err(e)) => {
395 tracing::error!("Hypersync request failed: {e}");
396 break;
397 }
398 Err(_) => {
399 tracing::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
400 continue;
401 }
402 };
403
404 for batch in response.data.blocks {
405 for received_block in batch {
406 let block = transform_hypersync_block(chain, received_block).unwrap();
407 let msg = BlockchainMessage::Block(block);
408 if let Err(e) = tx.send(msg) {
409 log::error!("Error sending message: {e}");
410 }
411 }
412 }
413
414 if let Some(archive_block_height) = response.archive_height
415 && archive_block_height < response.next_block
416 {
417 while client.get_height().await.unwrap() < response.next_block {
418 tokio::select! {
419 () = cancellation_token.cancelled() => {
420 tracing::debug!("Blocks subscription task received cancellation signal during polling");
421 return;
422 }
423 () = tokio::time::sleep(std::time::Duration::from_millis(
424 BLOCK_POLLING_INTERVAL_MS,
425 )) => {}
426 }
427 }
428 }
429
430 query.from_block = response.next_block;
431 }
432 }
433 }
434 });
435
436 self.blocks_task = Some(task);
437 }
438
439 fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
441 let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
442 .fields
443 .iter()
444 .map(|x| x.name.clone())
445 .collect();
446
447 Query {
448 from_block,
449 to_block,
450 blocks: vec![BlockSelection::default()],
451 field_selection: FieldSelection {
452 block: all_block_fields,
453 ..Default::default()
454 },
455 ..Default::default()
456 }
457 }
458
459 fn construct_contract_events_query(
460 from_block: u64,
461 to_block: Option<u64>,
462 contract_addresses: Vec<Address>,
463 topics: Vec<&str>,
464 ) -> Query {
465 let mut query_value = serde_json::json!({
466 "from_block": from_block,
467 "logs": [{
468 "topics": [topics],
469 "address": contract_addresses
470 }],
471 "field_selection": {
472 "log": [
473 "block_number",
474 "transaction_hash",
475 "transaction_index",
476 "log_index",
477 "address",
478 "data",
479 "topic0",
480 "topic1",
481 "topic2",
482 "topic3",
483 ]
484 }
485 });
486
487 if let Some(to_block) = to_block
488 && let Some(obj) = query_value.as_object_mut()
489 {
490 obj.insert("to_block".to_string(), serde_json::json!(to_block));
491 }
492
493 serde_json::from_value(query_value).unwrap()
494 }
495
496 pub async fn unsubscribe_blocks(&mut self) {
498 if let Some(task) = self.blocks_task.take() {
499 if let Some(token) = self.blocks_cancellation_token.take() {
501 token.cancel();
502 }
503 if let Err(e) = task.await {
504 tracing::error!("Error awaiting blocks task during unsubscribe: {e}");
505 }
506 tracing::debug!("Unsubscribed from blocks");
507 }
508 }
509}