nautilus_blockchain/hypersync/
client.rs1use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22 net_types::{BlockSelection, FieldSelection, Query},
23 simple_types::Log,
24};
25use nautilus_common::runtime::get_runtime;
26use nautilus_model::{
27 defi::{Block, DexType, SharedChain},
28 identifiers::InstrumentId,
29};
30use reqwest::Url;
31
32use crate::{
33 exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
34 rpc::types::BlockchainMessage,
35};
36
37const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
40
41#[derive(Debug)]
43pub struct HyperSyncClient {
44 chain: SharedChain,
46 client: Arc<hypersync_client::Client>,
48 blocks_task: Option<tokio::task::JoinHandle<()>>,
50 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
52 pool_addresses: AHashMap<InstrumentId, Address>,
54}
55
56impl HyperSyncClient {
57 #[must_use]
63 pub fn new(
64 chain: SharedChain,
65 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66 ) -> Self {
67 let mut config = hypersync_client::ClientConfig::default();
68 let hypersync_url =
69 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
70 config.url = Some(hypersync_url);
71 let client = hypersync_client::Client::new(config).unwrap();
72
73 Self {
74 chain,
75 client: Arc::new(client),
76 blocks_task: None,
77 tx,
78 pool_addresses: AHashMap::new(),
79 }
80 }
81
82 #[must_use]
83 pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
84 self.pool_addresses.get(&instrument_id)
85 }
86
87 pub async fn process_block_dex_contract_events(
93 &self,
94 dex: &DexType,
95 block: u64,
96 contract_addresses: Vec<Address>,
97 swap_event_encoded_signature: String,
98 mint_event_encoded_signature: String,
99 burn_event_encoded_signature: String,
100 ) {
101 let topics = vec![
102 swap_event_encoded_signature.as_str(),
103 &mint_event_encoded_signature.as_str(),
104 &burn_event_encoded_signature.as_str(),
105 ];
106 let query = Self::construct_contract_events_query(
107 block,
108 Some(block + 1),
109 contract_addresses,
110 topics,
111 );
112 let tx = if let Some(tx) = &self.tx {
113 tx.clone()
114 } else {
115 tracing::error!("Hypersync client channel should have been initialized");
116 return;
117 };
118 let client = self.client.clone();
119 let dex_extended =
120 get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
121
122 get_runtime().spawn(async move {
123 let mut rx = client
124 .stream(query, Default::default())
125 .await
126 .expect("Failed to create stream");
127
128 while let Some(response) = rx.recv().await {
129 let response = response.unwrap();
130
131 for batch in response.data.logs {
132 for log in batch {
133 let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
134 Some(log_argument) => {
135 format!("0x{}", hex::encode(log_argument.as_ref()))
136 }
137 None => continue,
138 };
139 if event_signature == swap_event_encoded_signature {
140 match dex_extended.parse_swap_event(log.clone()) {
141 Ok(swap_event) => {
142 if let Err(e) =
143 tx.send(BlockchainMessage::SwapEvent(swap_event))
144 {
145 tracing::error!("Failed to send swap event: {}", e);
146 }
147 }
148 Err(e) => {
149 tracing::error!(
150 "Failed to parse swap with error '{:?}' for event: {:?}",
151 e,
152 log
153 );
154 continue;
155 }
156 }
157 } else if event_signature == mint_event_encoded_signature {
158 match dex_extended.parse_mint_event(log.clone()) {
159 Ok(swap_event) => {
160 if let Err(e) =
161 tx.send(BlockchainMessage::MintEvent(swap_event))
162 {
163 tracing::error!("Failed to send mint event: {}", e);
164 }
165 }
166 Err(e) => {
167 tracing::error!(
168 "Failed to parse mint with error '{:?}' for event: {:?}",
169 e,
170 log
171 );
172 continue;
173 }
174 }
175 } else if event_signature == burn_event_encoded_signature {
176 match dex_extended.parse_burn_event(log.clone()) {
177 Ok(swap_event) => {
178 if let Err(e) =
179 tx.send(BlockchainMessage::BurnEvent(swap_event))
180 {
181 tracing::error!("Failed to send burn event: {}", e);
182 }
183 }
184 Err(e) => {
185 tracing::error!(
186 "Failed to parse burn with error '{:?}' for event: {:?}",
187 e,
188 log
189 );
190 continue;
191 }
192 }
193 } else {
194 tracing::error!("Unknown event signature: {}", event_signature);
195 continue;
196 }
197 }
198 }
199 }
200 });
201 }
202
203 pub async fn request_contract_events_stream(
209 &self,
210 from_block: u64,
211 to_block: Option<u64>,
212 contract_address: &Address,
213 topics: Vec<&str>,
214 ) -> impl Stream<Item = Log> + use<> {
215 let query = Self::construct_contract_events_query(
216 from_block,
217 to_block,
218 vec![contract_address.clone()],
219 topics,
220 );
221
222 let mut rx = self
223 .client
224 .clone()
225 .stream(query, Default::default())
226 .await
227 .expect("Failed to create stream");
228
229 async_stream::stream! {
230 while let Some(response) = rx.recv().await {
231 let response = response.unwrap();
232
233 for batch in response.data.logs {
234 for log in batch {
235 yield log
236 }
237 }
238 }
239 }
240 }
241
242 pub fn disconnect(&mut self) {
244 self.unsubscribe_blocks();
245 }
246
247 pub async fn current_block(&self) -> u64 {
253 self.client.get_height().await.unwrap()
254 }
255
256 pub async fn request_blocks_stream(
262 &self,
263 from_block: u64,
264 to_block: Option<u64>,
265 ) -> impl Stream<Item = Block> {
266 let query = Self::construct_block_query(from_block, to_block);
267 let mut rx = self
268 .client
269 .clone()
270 .stream(query, Default::default())
271 .await
272 .unwrap();
273
274 let chain = self.chain.name;
275
276 async_stream::stream! {
277 while let Some(response) = rx.recv().await {
278 let response = response.unwrap();
279 for batch in response.data.blocks {
280 for received_block in batch {
281 let block = transform_hypersync_block(chain, received_block).unwrap();
282 yield block
283 }
284 }
285 }
286 }
287 }
288
289 pub fn subscribe_blocks(&mut self) {
295 if self.blocks_task.is_some() {
296 return;
297 }
298
299 let chain = self.chain.name;
300 let client = self.client.clone();
301 let tx = if let Some(tx) = &self.tx {
302 tx.clone()
303 } else {
304 tracing::error!("Hypersync client channel should have been initialized");
305 return;
306 };
307
308 let task = get_runtime().spawn(async move {
309 tracing::debug!("Starting task 'blocks_feed");
310
311 let current_block_height = client.get_height().await.unwrap();
312 let mut query = Self::construct_block_query(current_block_height, None);
313
314 loop {
315 let response = client.get(&query).await.unwrap();
316 for batch in response.data.blocks {
317 for received_block in batch {
318 let block = transform_hypersync_block(chain, received_block).unwrap();
319 let msg = BlockchainMessage::Block(block);
320 if let Err(e) = tx.send(msg) {
321 log::error!("Error sending message: {e}");
322 }
323 }
324 }
325
326 if let Some(archive_block_height) = response.archive_height
327 && archive_block_height < response.next_block
328 {
329 while client.get_height().await.unwrap() < response.next_block {
330 tokio::time::sleep(std::time::Duration::from_millis(
331 BLOCK_POLLING_INTERVAL_MS,
332 ))
333 .await;
334 }
335 }
336
337 query.from_block = response.next_block;
338 }
339 });
340
341 self.blocks_task = Some(task);
342 }
343
344 fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
346 let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
347 .fields
348 .iter()
349 .map(|x| x.name.clone())
350 .collect();
351
352 Query {
353 from_block,
354 to_block,
355 blocks: vec![BlockSelection::default()],
356 field_selection: FieldSelection {
357 block: all_block_fields,
358 ..Default::default()
359 },
360 ..Default::default()
361 }
362 }
363
364 fn construct_contract_events_query(
365 from_block: u64,
366 to_block: Option<u64>,
367 contract_addresses: Vec<Address>,
368 topics: Vec<&str>,
369 ) -> Query {
370 let mut query_value = serde_json::json!({
371 "from_block": from_block,
372 "logs": [{
373 "topics": [topics],
374 "address": contract_addresses
375 }],
376 "field_selection": {
377 "log": [
378 "block_number",
379 "transaction_hash",
380 "transaction_index",
381 "log_index",
382 "address",
383 "data",
384 "topic0",
385 "topic1",
386 "topic2",
387 "topic3",
388 ]
389 }
390 });
391
392 if let Some(to_block) = to_block
393 && let Some(obj) = query_value.as_object_mut()
394 {
395 obj.insert("to_block".to_string(), serde_json::json!(to_block));
396 }
397
398 serde_json::from_value(query_value).unwrap()
399 }
400
401 pub fn unsubscribe_blocks(&mut self) {
403 if let Some(task) = self.blocks_task.take() {
404 task.abort();
405 tracing::debug!("Unsubscribed from blocks");
406 }
407 }
408}