1use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26 pool_swap_event_encoded: AHashMap<DexType, String>,
27 pool_mint_event_encoded: AHashMap<DexType, String>,
28 pool_burn_event_encoded: AHashMap<DexType, String>,
29 pool_collect_event_encoded: AHashMap<DexType, String>,
30 pool_flash_event_encoded: AHashMap<DexType, String>,
31 subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32 subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33 subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34 subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35 subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl DefiDataSubscriptionManager {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 pool_swap_event_encoded: AHashMap::new(),
50 pool_burn_event_encoded: AHashMap::new(),
51 pool_mint_event_encoded: AHashMap::new(),
52 pool_collect_event_encoded: AHashMap::new(),
53 pool_flash_event_encoded: AHashMap::new(),
54 subscribed_pool_burns: AHashMap::new(),
55 subscribed_pool_mints: AHashMap::new(),
56 subscribed_pool_swaps: AHashMap::new(),
57 subscribed_pool_collects: AHashMap::new(),
58 subscribed_pool_flashes: AHashMap::new(),
59 }
60 }
61
62 #[must_use]
64 pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65 let mut unique_addresses = AHashSet::new();
66
67 if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68 unique_addresses.extend(addresses.iter().copied());
69 }
70 if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71 unique_addresses.extend(addresses.iter().copied());
72 }
73 if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74 unique_addresses.extend(addresses.iter().copied());
75 }
76 if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77 unique_addresses.extend(addresses.iter().copied());
78 }
79 if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80 unique_addresses.extend(addresses.iter().copied());
81 }
82
83 unique_addresses.into_iter().collect()
84 }
85
86 #[must_use]
88 pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89 let mut result = Vec::new();
90
91 if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92 result.push(swap_event_signature.clone());
93 }
94 if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95 result.push(mint_event_signature.clone());
96 }
97 if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98 result.push(burn_event_signature.clone());
99 }
100 if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101 result.push(collect_event_signature.clone());
102 }
103 if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104 result.push(flash_event_signature.clone());
105 }
106
107 result
108 }
109
110 #[must_use]
112 pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113 self.pool_swap_event_encoded.get(dex).cloned()
114 }
115
116 #[must_use]
118 pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119 self.pool_mint_event_encoded.get(dex).cloned()
120 }
121 #[must_use]
123 pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124 self.pool_burn_event_encoded.get(dex).cloned()
125 }
126
127 #[must_use]
129 pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130 self.pool_collect_event_encoded.get(dex).cloned()
131 }
132
133 fn normalize_topic(sig: &str) -> String {
142 let s = sig.trim();
143
144 if let Some(rest) = s.strip_prefix("0x")
146 && rest.len() == 64
147 && rest.chars().all(|c| c.is_ascii_hexdigit())
148 {
149 return format!("0x{}", rest.to_ascii_lowercase());
150 }
151
152 if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
154 return format!("0x{}", s.to_ascii_lowercase());
155 }
156
157 format!("0x{}", hex::encode(keccak256(s.as_bytes())))
159 }
160
161 pub fn register_dex_for_subscriptions(
166 &mut self,
167 dex: DexType,
168 swap_event_signature: &str,
169 mint_event_signature: &str,
170 burn_event_signature: &str,
171 collect_event_signature: &str,
172 flash_event_signature: Option<&str>,
173 ) {
174 self.subscribed_pool_swaps.insert(dex, AHashSet::new());
175 self.pool_swap_event_encoded
176 .insert(dex, Self::normalize_topic(swap_event_signature));
177
178 self.subscribed_pool_mints.insert(dex, AHashSet::new());
179 self.pool_mint_event_encoded
180 .insert(dex, Self::normalize_topic(mint_event_signature));
181
182 self.subscribed_pool_burns.insert(dex, AHashSet::new());
183 self.pool_burn_event_encoded
184 .insert(dex, Self::normalize_topic(burn_event_signature));
185
186 self.subscribed_pool_collects.insert(dex, AHashSet::new());
187 self.pool_collect_event_encoded
188 .insert(dex, Self::normalize_topic(collect_event_signature));
189
190 if let Some(flash_event_signature) = flash_event_signature {
191 self.subscribed_pool_flashes.insert(dex, AHashSet::new());
192 self.pool_flash_event_encoded
193 .insert(dex, Self::normalize_topic(flash_event_signature));
194 }
195
196 tracing::info!("Registered DEX for subscriptions: {dex:?}");
197 }
198
199 pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
201 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
202 pool_set.insert(address);
203 } else {
204 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
205 }
206 }
207
208 pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
210 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
211 pool_set.insert(address);
212 } else {
213 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
214 }
215 }
216
217 pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
219 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
220 pool_set.insert(address);
221 } else {
222 tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
223 }
224 }
225
226 pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
228 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
229 pool_set.remove(&address);
230 } else {
231 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
232 }
233 }
234
235 pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
237 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
238 pool_set.remove(&address);
239 } else {
240 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
241 }
242 }
243
244 pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
246 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
247 pool_set.remove(&address);
248 } else {
249 tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
250 }
251 }
252
253 pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
255 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
256 pool_set.insert(address);
257 } else {
258 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
259 }
260 }
261
262 pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
264 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265 pool_set.remove(&address);
266 } else {
267 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
268 }
269 }
270
271 pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
273 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
274 pool_set.insert(address);
275 } else {
276 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
277 }
278 }
279
280 pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
282 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283 pool_set.remove(&address);
284 } else {
285 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use alloy::primitives::address;
293 use rstest::{fixture, rstest};
294
295 use super::*;
296
297 #[fixture]
298 fn manager() -> DefiDataSubscriptionManager {
299 DefiDataSubscriptionManager::new()
300 }
301
302 #[fixture]
303 fn registered_manager() -> DefiDataSubscriptionManager {
304 let mut manager = DefiDataSubscriptionManager::new();
305 manager.register_dex_for_subscriptions(
306 DexType::UniswapV3,
307 "Swap(address,address,int256,int256,uint160,uint128,int24)",
308 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
309 "Burn(address,int24,int24,uint128,uint256,uint256)",
310 "Collect(address,address,int24,int24,uint128,uint128)",
311 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
312 );
313 manager
314 }
315
316 #[rstest]
317 fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
318 assert_eq!(
319 manager
320 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
321 .len(),
322 0
323 );
324 assert_eq!(
325 manager
326 .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
327 .len(),
328 0
329 );
330 assert!(
331 manager
332 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
333 .is_none()
334 );
335 assert!(
336 manager
337 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
338 .is_none()
339 );
340 assert!(
341 manager
342 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
343 .is_none()
344 );
345 }
346
347 #[rstest]
348 fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
349 let signatures =
351 registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
352 assert_eq!(signatures.len(), 5);
353
354 assert!(
356 registered_manager
357 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
358 .is_some()
359 );
360 assert!(
361 registered_manager
362 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
363 .is_some()
364 );
365 assert!(
366 registered_manager
367 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
368 .is_some()
369 );
370 }
371
372 #[rstest]
373 fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
374 let pool_address = address!("1234567890123456789012345678901234567890");
375
376 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
378
379 let addresses =
380 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
381 assert_eq!(addresses.len(), 1);
382 assert_eq!(addresses[0], pool_address);
383 }
384
385 #[rstest]
386 fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
387 let pool_address = address!("1234567890123456789012345678901234567890");
388
389 manager.subscribe_swaps(DexType::UniswapV3, pool_address);
391 manager.subscribe_mints(DexType::UniswapV3, pool_address);
392 manager.subscribe_burns(DexType::UniswapV3, pool_address);
393
394 let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
396 assert_eq!(addresses.len(), 0);
397 }
398
399 #[rstest]
400 fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
401 let pool_address = address!("1234567890123456789012345678901234567890");
402
403 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
405
406 assert_eq!(
408 registered_manager
409 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
410 .len(),
411 1
412 );
413
414 registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
416
417 assert_eq!(
419 registered_manager
420 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
421 .len(),
422 0
423 );
424 }
425
426 #[rstest]
427 fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
428 let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
429 let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
430 let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
431
432 assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
434 assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
435 assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
436 }
437
438 #[rstest]
439 fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
440 let pool_address = address!("1234567890123456789012345678901234567890");
441
442 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
444 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
445
446 let addresses =
448 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
449 assert_eq!(addresses.len(), 1);
450 }
451
452 #[rstest]
453 fn test_get_combined_addresses_from_all_events(
454 mut registered_manager: DefiDataSubscriptionManager,
455 ) {
456 let pool1 = address!("1111111111111111111111111111111111111111");
457 let pool2 = address!("2222222222222222222222222222222222222222");
458 let pool3 = address!("3333333333333333333333333333333333333333");
459
460 registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
462 registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
463 registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
464
465 let addresses =
467 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
468 assert_eq!(addresses.len(), 3);
469 assert!(addresses.contains(&pool1));
470 assert!(addresses.contains(&pool2));
471 assert!(addresses.contains(&pool3));
472 }
473
474 #[rstest]
475 fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
476 let swap_sig = registered_manager
479 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
480 .unwrap();
481
482 assert!(swap_sig.starts_with("0x"));
484 assert_eq!(swap_sig.len(), 66); let hex_part = &swap_sig[2..];
488 assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
489 }
490
491 #[rstest]
492 #[case(DexType::UniswapV3)]
493 #[case(DexType::UniswapV2)]
494 fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
495 let mut manager = DefiDataSubscriptionManager::new();
496 let pool1 = address!("1111111111111111111111111111111111111111");
497 let pool2 = address!("2222222222222222222222222222222222222222");
498
499 manager.register_dex_for_subscriptions(
501 dex_type,
502 "Swap(address,uint256,uint256)",
503 "Mint(address,uint256)",
504 "Burn(address,uint256)",
505 "Collect(address,uint256,uint256)",
506 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
507 );
508
509 manager.subscribe_swaps(dex_type, pool1);
511 manager.subscribe_swaps(dex_type, pool2);
512 manager.subscribe_mints(dex_type, pool1);
513 manager.subscribe_burns(dex_type, pool2);
514
515 let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
517 assert_eq!(addresses.len(), 2);
518 assert!(addresses.contains(&pool1));
519 assert!(addresses.contains(&pool2));
520
521 let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
523 assert_eq!(signatures.len(), 5);
524
525 manager.unsubscribe_swaps(dex_type, pool1);
527 manager.unsubscribe_burns(dex_type, pool2);
528
529 let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
531 assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
534
535 #[rstest]
536 fn test_register_with_raw_signatures() {
537 let mut manager = DefiDataSubscriptionManager::new();
538
539 manager.register_dex_for_subscriptions(
541 DexType::UniswapV3,
542 "Swap(address,address,int256,int256,uint160,uint128,int24)",
543 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
544 "Burn(address,int24,int24,uint128,uint256,uint256)",
545 "Collect(address,address,int24,int24,uint128,uint128)",
546 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
547 );
548
549 let swap_sig = manager
551 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
552 .unwrap();
553 let mint_sig = manager
554 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
555 .unwrap();
556 let burn_sig = manager
557 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
558 .unwrap();
559
560 assert_eq!(
562 swap_sig,
563 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
564 );
565 assert_eq!(
566 mint_sig,
567 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
568 );
569 assert_eq!(
570 burn_sig,
571 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
572 );
573 }
574
575 #[rstest]
576 fn test_register_with_pre_encoded_signatures() {
577 let mut manager = DefiDataSubscriptionManager::new();
578
579 manager.register_dex_for_subscriptions(
581 DexType::UniswapV3,
582 "Swap(address,address,int256,int256,uint160,uint128,int24)",
583 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
584 "Burn(address,int24,int24,uint128,uint256,uint256)",
585 "Collect(address,address,int24,int24,uint128,uint128)",
586 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
587 );
588
589 let swap_sig = manager
591 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
592 .unwrap();
593 let mint_sig = manager
594 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
595 .unwrap();
596 let burn_sig = manager
597 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
598 .unwrap();
599
600 assert_eq!(
601 swap_sig,
602 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
603 );
604 assert_eq!(
605 mint_sig,
606 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
607 );
608 assert_eq!(
609 burn_sig,
610 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
611 );
612 }
613
614 #[rstest]
615 fn test_register_with_pre_encoded_signatures_no_prefix() {
616 let mut manager = DefiDataSubscriptionManager::new();
617
618 manager.register_dex_for_subscriptions(
620 DexType::UniswapV3,
621 "Swap(address,address,int256,int256,uint160,uint128,int24)",
622 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
623 "Burn(address,int24,int24,uint128,uint256,uint256)",
624 "Collect(address,address,int24,int24,uint128,uint128)",
625 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
626 );
627
628 let swap_sig = manager
630 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
631 .unwrap();
632 let mint_sig = manager
633 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
634 .unwrap();
635 let burn_sig = manager
636 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
637 .unwrap();
638
639 assert_eq!(
640 swap_sig,
641 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
642 );
643 assert_eq!(
644 mint_sig,
645 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
646 );
647 assert_eq!(
648 burn_sig,
649 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
650 );
651 }
652}