1use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26 pool_swap_event_encoded: AHashMap<DexType, String>,
27 pool_mint_event_encoded: AHashMap<DexType, String>,
28 pool_burn_event_encoded: AHashMap<DexType, String>,
29 pool_collect_event_encoded: AHashMap<DexType, String>,
30 pool_flash_event_encoded: AHashMap<DexType, String>,
31 subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32 subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33 subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34 subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35 subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl DefiDataSubscriptionManager {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 pool_swap_event_encoded: AHashMap::new(),
50 pool_burn_event_encoded: AHashMap::new(),
51 pool_mint_event_encoded: AHashMap::new(),
52 pool_collect_event_encoded: AHashMap::new(),
53 pool_flash_event_encoded: AHashMap::new(),
54 subscribed_pool_burns: AHashMap::new(),
55 subscribed_pool_mints: AHashMap::new(),
56 subscribed_pool_swaps: AHashMap::new(),
57 subscribed_pool_collects: AHashMap::new(),
58 subscribed_pool_flashes: AHashMap::new(),
59 }
60 }
61
62 #[must_use]
64 pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65 let mut unique_addresses = AHashSet::new();
66
67 if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68 unique_addresses.extend(addresses.iter().copied());
69 }
70 if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71 unique_addresses.extend(addresses.iter().copied());
72 }
73 if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74 unique_addresses.extend(addresses.iter().copied());
75 }
76 if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77 unique_addresses.extend(addresses.iter().copied());
78 }
79 if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80 unique_addresses.extend(addresses.iter().copied());
81 }
82
83 unique_addresses.into_iter().collect()
84 }
85
86 #[must_use]
88 pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89 let mut result = Vec::new();
90
91 if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92 result.push(swap_event_signature.clone());
93 }
94 if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95 result.push(mint_event_signature.clone());
96 }
97 if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98 result.push(burn_event_signature.clone());
99 }
100 if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101 result.push(collect_event_signature.clone());
102 }
103 if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104 result.push(flash_event_signature.clone());
105 }
106
107 result
108 }
109
110 #[must_use]
112 pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113 self.pool_swap_event_encoded.get(dex).cloned()
114 }
115
116 #[must_use]
118 pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119 self.pool_mint_event_encoded.get(dex).cloned()
120 }
121 #[must_use]
123 pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124 self.pool_burn_event_encoded.get(dex).cloned()
125 }
126
127 #[must_use]
129 pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130 self.pool_collect_event_encoded.get(dex).cloned()
131 }
132
133 fn normalize_topic(sig: &str) -> String {
142 let s = sig.trim();
143
144 if let Some(rest) = s.strip_prefix("0x")
146 && rest.len() == 64
147 && rest.chars().all(|c| c.is_ascii_hexdigit())
148 {
149 return format!("0x{}", rest.to_ascii_lowercase());
150 }
151
152 if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
154 return format!("0x{}", s.to_ascii_lowercase());
155 }
156
157 format!("0x{}", hex::encode(keccak256(s.as_bytes())))
159 }
160
161 pub fn register_dex_for_subscriptions(
166 &mut self,
167 dex: DexType,
168 swap_event_signature: &str,
169 mint_event_signature: &str,
170 burn_event_signature: &str,
171 collect_event_signature: &str,
172 flash_event_signature: Option<&str>,
173 ) {
174 self.subscribed_pool_swaps.insert(dex, AHashSet::new());
175 self.pool_swap_event_encoded
176 .insert(dex, Self::normalize_topic(swap_event_signature));
177
178 self.subscribed_pool_mints.insert(dex, AHashSet::new());
179 self.pool_mint_event_encoded
180 .insert(dex, Self::normalize_topic(mint_event_signature));
181
182 self.subscribed_pool_burns.insert(dex, AHashSet::new());
183 self.pool_burn_event_encoded
184 .insert(dex, Self::normalize_topic(burn_event_signature));
185
186 self.subscribed_pool_collects.insert(dex, AHashSet::new());
187 self.pool_collect_event_encoded
188 .insert(dex, Self::normalize_topic(collect_event_signature));
189
190 if let Some(flash_event_signature) = flash_event_signature {
191 self.subscribed_pool_flashes.insert(dex, AHashSet::new());
192 self.pool_flash_event_encoded
193 .insert(dex, Self::normalize_topic(flash_event_signature));
194 }
195
196 tracing::info!("Registered DEX for subscriptions: {dex:?}");
197 }
198
199 pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
201 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
202 pool_set.insert(address);
203 } else {
204 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
205 }
206 }
207
208 pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
210 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
211 pool_set.insert(address);
212 } else {
213 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
214 }
215 }
216
217 pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
219 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
220 pool_set.insert(address);
221 } else {
222 tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
223 }
224 }
225
226 pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
228 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
229 pool_set.remove(&address);
230 } else {
231 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
232 }
233 }
234
235 pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
237 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
238 pool_set.remove(&address);
239 } else {
240 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
241 }
242 }
243
244 pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
246 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
247 pool_set.remove(&address);
248 } else {
249 tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
250 }
251 }
252
253 pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
255 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
256 pool_set.insert(address);
257 } else {
258 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
259 }
260 }
261
262 pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
264 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265 pool_set.remove(&address);
266 } else {
267 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
268 }
269 }
270
271 pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
273 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
274 pool_set.insert(address);
275 } else {
276 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
277 }
278 }
279
280 pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
282 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283 pool_set.remove(&address);
284 } else {
285 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
286 }
287 }
288}
289
290#[cfg(test)]
295mod tests {
296 use alloy::primitives::address;
297 use rstest::{fixture, rstest};
298
299 use super::*;
300
301 #[fixture]
302 fn manager() -> DefiDataSubscriptionManager {
303 DefiDataSubscriptionManager::new()
304 }
305
306 #[fixture]
307 fn registered_manager() -> DefiDataSubscriptionManager {
308 let mut manager = DefiDataSubscriptionManager::new();
309 manager.register_dex_for_subscriptions(
310 DexType::UniswapV3,
311 "Swap(address,address,int256,int256,uint160,uint128,int24)",
312 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
313 "Burn(address,int24,int24,uint128,uint256,uint256)",
314 "Collect(address,address,int24,int24,uint128,uint128)",
315 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
316 );
317 manager
318 }
319
320 #[rstest]
321 fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
322 assert_eq!(
323 manager
324 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
325 .len(),
326 0
327 );
328 assert_eq!(
329 manager
330 .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
331 .len(),
332 0
333 );
334 assert!(
335 manager
336 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
337 .is_none()
338 );
339 assert!(
340 manager
341 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
342 .is_none()
343 );
344 assert!(
345 manager
346 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
347 .is_none()
348 );
349 }
350
351 #[rstest]
352 fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
353 let signatures =
355 registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
356 assert_eq!(signatures.len(), 5);
357
358 assert!(
360 registered_manager
361 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
362 .is_some()
363 );
364 assert!(
365 registered_manager
366 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
367 .is_some()
368 );
369 assert!(
370 registered_manager
371 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
372 .is_some()
373 );
374 }
375
376 #[rstest]
377 fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
378 let pool_address = address!("1234567890123456789012345678901234567890");
379
380 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
382
383 let addresses =
384 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
385 assert_eq!(addresses.len(), 1);
386 assert_eq!(addresses[0], pool_address);
387 }
388
389 #[rstest]
390 fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
391 let pool_address = address!("1234567890123456789012345678901234567890");
392
393 manager.subscribe_swaps(DexType::UniswapV3, pool_address);
395 manager.subscribe_mints(DexType::UniswapV3, pool_address);
396 manager.subscribe_burns(DexType::UniswapV3, pool_address);
397
398 let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
400 assert_eq!(addresses.len(), 0);
401 }
402
403 #[rstest]
404 fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
405 let pool_address = address!("1234567890123456789012345678901234567890");
406
407 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
409
410 assert_eq!(
412 registered_manager
413 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
414 .len(),
415 1
416 );
417
418 registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
420
421 assert_eq!(
423 registered_manager
424 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
425 .len(),
426 0
427 );
428 }
429
430 #[rstest]
431 fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
432 let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
433 let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
434 let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
435
436 assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
438 assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
439 assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
440 }
441
442 #[rstest]
443 fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
444 let pool_address = address!("1234567890123456789012345678901234567890");
445
446 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
448 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
449
450 let addresses =
452 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
453 assert_eq!(addresses.len(), 1);
454 }
455
456 #[rstest]
457 fn test_get_combined_addresses_from_all_events(
458 mut registered_manager: DefiDataSubscriptionManager,
459 ) {
460 let pool1 = address!("1111111111111111111111111111111111111111");
461 let pool2 = address!("2222222222222222222222222222222222222222");
462 let pool3 = address!("3333333333333333333333333333333333333333");
463
464 registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
466 registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
467 registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
468
469 let addresses =
471 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
472 assert_eq!(addresses.len(), 3);
473 assert!(addresses.contains(&pool1));
474 assert!(addresses.contains(&pool2));
475 assert!(addresses.contains(&pool3));
476 }
477
478 #[rstest]
479 fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
480 let swap_sig = registered_manager
483 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
484 .unwrap();
485
486 assert!(swap_sig.starts_with("0x"));
488 assert_eq!(swap_sig.len(), 66); let hex_part = &swap_sig[2..];
492 assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
493 }
494
495 #[rstest]
496 #[case(DexType::UniswapV3)]
497 #[case(DexType::UniswapV2)]
498 fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
499 let mut manager = DefiDataSubscriptionManager::new();
500 let pool1 = address!("1111111111111111111111111111111111111111");
501 let pool2 = address!("2222222222222222222222222222222222222222");
502
503 manager.register_dex_for_subscriptions(
505 dex_type,
506 "Swap(address,uint256,uint256)",
507 "Mint(address,uint256)",
508 "Burn(address,uint256)",
509 "Collect(address,uint256,uint256)",
510 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
511 );
512
513 manager.subscribe_swaps(dex_type, pool1);
515 manager.subscribe_swaps(dex_type, pool2);
516 manager.subscribe_mints(dex_type, pool1);
517 manager.subscribe_burns(dex_type, pool2);
518
519 let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
521 assert_eq!(addresses.len(), 2);
522 assert!(addresses.contains(&pool1));
523 assert!(addresses.contains(&pool2));
524
525 let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
527 assert_eq!(signatures.len(), 5);
528
529 manager.unsubscribe_swaps(dex_type, pool1);
531 manager.unsubscribe_burns(dex_type, pool2);
532
533 let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
535 assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
538
539 #[rstest]
540 fn test_register_with_raw_signatures() {
541 let mut manager = DefiDataSubscriptionManager::new();
542
543 manager.register_dex_for_subscriptions(
545 DexType::UniswapV3,
546 "Swap(address,address,int256,int256,uint160,uint128,int24)",
547 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
548 "Burn(address,int24,int24,uint128,uint256,uint256)",
549 "Collect(address,address,int24,int24,uint128,uint128)",
550 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
551 );
552
553 let swap_sig = manager
555 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
556 .unwrap();
557 let mint_sig = manager
558 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
559 .unwrap();
560 let burn_sig = manager
561 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
562 .unwrap();
563
564 assert_eq!(
566 swap_sig,
567 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
568 );
569 assert_eq!(
570 mint_sig,
571 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
572 );
573 assert_eq!(
574 burn_sig,
575 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
576 );
577 }
578
579 #[rstest]
580 fn test_register_with_pre_encoded_signatures() {
581 let mut manager = DefiDataSubscriptionManager::new();
582
583 manager.register_dex_for_subscriptions(
585 DexType::UniswapV3,
586 "Swap(address,address,int256,int256,uint160,uint128,int24)",
587 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
588 "Burn(address,int24,int24,uint128,uint256,uint256)",
589 "Collect(address,address,int24,int24,uint128,uint128)",
590 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
591 );
592
593 let swap_sig = manager
595 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
596 .unwrap();
597 let mint_sig = manager
598 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
599 .unwrap();
600 let burn_sig = manager
601 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
602 .unwrap();
603
604 assert_eq!(
605 swap_sig,
606 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
607 );
608 assert_eq!(
609 mint_sig,
610 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
611 );
612 assert_eq!(
613 burn_sig,
614 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
615 );
616 }
617
618 #[rstest]
619 fn test_register_with_pre_encoded_signatures_no_prefix() {
620 let mut manager = DefiDataSubscriptionManager::new();
621
622 manager.register_dex_for_subscriptions(
624 DexType::UniswapV3,
625 "Swap(address,address,int256,int256,uint160,uint128,int24)",
626 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
627 "Burn(address,int24,int24,uint128,uint256,uint256)",
628 "Collect(address,address,int24,int24,uint128,uint128)",
629 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
630 );
631
632 let swap_sig = manager
634 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
635 .unwrap();
636 let mint_sig = manager
637 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
638 .unwrap();
639 let burn_sig = manager
640 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
641 .unwrap();
642
643 assert_eq!(
644 swap_sig,
645 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
646 );
647 assert_eq!(
648 mint_sig,
649 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
650 );
651 assert_eq!(
652 burn_sig,
653 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
654 );
655 }
656}