1use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26 pool_swap_event_encoded: AHashMap<DexType, String>,
27 pool_mint_event_encoded: AHashMap<DexType, String>,
28 pool_burn_event_encoded: AHashMap<DexType, String>,
29 pool_collect_event_encoded: AHashMap<DexType, String>,
30 pool_flash_event_encoded: AHashMap<DexType, String>,
31 subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32 subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33 subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34 subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35 subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl DefiDataSubscriptionManager {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 pool_swap_event_encoded: AHashMap::new(),
50 pool_burn_event_encoded: AHashMap::new(),
51 pool_mint_event_encoded: AHashMap::new(),
52 pool_collect_event_encoded: AHashMap::new(),
53 pool_flash_event_encoded: AHashMap::new(),
54 subscribed_pool_burns: AHashMap::new(),
55 subscribed_pool_mints: AHashMap::new(),
56 subscribed_pool_swaps: AHashMap::new(),
57 subscribed_pool_collects: AHashMap::new(),
58 subscribed_pool_flashes: AHashMap::new(),
59 }
60 }
61
62 #[must_use]
64 pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65 let mut unique_addresses = AHashSet::new();
66
67 if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68 unique_addresses.extend(addresses.iter().copied());
69 }
70 if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71 unique_addresses.extend(addresses.iter().copied());
72 }
73 if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74 unique_addresses.extend(addresses.iter().copied());
75 }
76 if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77 unique_addresses.extend(addresses.iter().copied());
78 }
79 if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80 unique_addresses.extend(addresses.iter().copied());
81 }
82
83 unique_addresses.into_iter().collect()
84 }
85
86 #[must_use]
88 pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89 let mut result = Vec::new();
90
91 if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92 result.push(swap_event_signature.clone());
93 }
94 if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95 result.push(mint_event_signature.clone());
96 }
97 if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98 result.push(burn_event_signature.clone());
99 }
100 if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101 result.push(collect_event_signature.clone());
102 }
103 if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104 result.push(flash_event_signature.clone());
105 }
106
107 result
108 }
109
110 #[must_use]
112 pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113 self.pool_swap_event_encoded.get(dex).cloned()
114 }
115
116 #[must_use]
118 pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119 self.pool_mint_event_encoded.get(dex).cloned()
120 }
121 #[must_use]
123 pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124 self.pool_burn_event_encoded.get(dex).cloned()
125 }
126
127 #[must_use]
129 pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130 self.pool_collect_event_encoded.get(dex).cloned()
131 }
132
133 fn normalize_topic(sig: &str) -> String {
142 let s = sig.trim();
143
144 if let Some(rest) = s.strip_prefix("0x") {
146 if rest.len() == 64 && rest.chars().all(|c| c.is_ascii_hexdigit()) {
147 return format!("0x{}", rest.to_ascii_lowercase());
148 }
149 }
150
151 if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
153 return format!("0x{}", s.to_ascii_lowercase());
154 }
155
156 format!("0x{}", hex::encode(keccak256(s.as_bytes())))
158 }
159
160 pub fn register_dex_for_subscriptions(
165 &mut self,
166 dex: DexType,
167 swap_event_signature: &str,
168 mint_event_signature: &str,
169 burn_event_signature: &str,
170 collect_event_signature: &str,
171 flash_event_signature: Option<&str>,
172 ) {
173 self.subscribed_pool_swaps.insert(dex, AHashSet::new());
174 self.pool_swap_event_encoded
175 .insert(dex, Self::normalize_topic(swap_event_signature));
176
177 self.subscribed_pool_mints.insert(dex, AHashSet::new());
178 self.pool_mint_event_encoded
179 .insert(dex, Self::normalize_topic(mint_event_signature));
180
181 self.subscribed_pool_burns.insert(dex, AHashSet::new());
182 self.pool_burn_event_encoded
183 .insert(dex, Self::normalize_topic(burn_event_signature));
184
185 self.subscribed_pool_collects.insert(dex, AHashSet::new());
186 self.pool_collect_event_encoded
187 .insert(dex, Self::normalize_topic(collect_event_signature));
188
189 if let Some(flash_event_signature) = flash_event_signature {
190 self.subscribed_pool_flashes.insert(dex, AHashSet::new());
191 self.pool_flash_event_encoded
192 .insert(dex, Self::normalize_topic(flash_event_signature));
193 }
194
195 tracing::info!("Registered DEX for subscriptions: {dex:?}");
196 }
197
198 pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
200 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
201 pool_set.insert(address);
202 } else {
203 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
204 }
205 }
206
207 pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
209 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
210 pool_set.insert(address);
211 } else {
212 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
213 }
214 }
215
216 pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
218 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
219 pool_set.insert(address);
220 } else {
221 tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
222 }
223 }
224
225 pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
227 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
228 pool_set.remove(&address);
229 } else {
230 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
231 }
232 }
233
234 pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
236 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
237 pool_set.remove(&address);
238 } else {
239 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
240 }
241 }
242
243 pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
245 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
246 pool_set.remove(&address);
247 } else {
248 tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
249 }
250 }
251
252 pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
254 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
255 pool_set.insert(address);
256 } else {
257 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
258 }
259 }
260
261 pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
263 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
264 pool_set.remove(&address);
265 } else {
266 tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
267 }
268 }
269
270 pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
272 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
273 pool_set.insert(address);
274 } else {
275 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
276 }
277 }
278
279 pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
281 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
282 pool_set.remove(&address);
283 } else {
284 tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
285 }
286 }
287}
288
289#[cfg(test)]
294mod tests {
295 use alloy::primitives::address;
296 use rstest::{fixture, rstest};
297
298 use super::*;
299
300 #[fixture]
301 fn manager() -> DefiDataSubscriptionManager {
302 DefiDataSubscriptionManager::new()
303 }
304
305 #[fixture]
306 fn registered_manager() -> DefiDataSubscriptionManager {
307 let mut manager = DefiDataSubscriptionManager::new();
308 manager.register_dex_for_subscriptions(
309 DexType::UniswapV3,
310 "Swap(address,address,int256,int256,uint160,uint128,int24)",
311 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
312 "Burn(address,int24,int24,uint128,uint256,uint256)",
313 "Collect(address,address,int24,int24,uint128,uint128)",
314 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
315 );
316 manager
317 }
318
319 #[rstest]
320 fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
321 assert_eq!(
322 manager
323 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
324 .len(),
325 0
326 );
327 assert_eq!(
328 manager
329 .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
330 .len(),
331 0
332 );
333 assert!(
334 manager
335 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
336 .is_none()
337 );
338 assert!(
339 manager
340 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
341 .is_none()
342 );
343 assert!(
344 manager
345 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
346 .is_none()
347 );
348 }
349
350 #[rstest]
351 fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
352 let signatures =
354 registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
355 assert_eq!(signatures.len(), 5);
356
357 assert!(
359 registered_manager
360 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
361 .is_some()
362 );
363 assert!(
364 registered_manager
365 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
366 .is_some()
367 );
368 assert!(
369 registered_manager
370 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
371 .is_some()
372 );
373 }
374
375 #[rstest]
376 fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
377 let pool_address = address!("1234567890123456789012345678901234567890");
378
379 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
381
382 let addresses =
383 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
384 assert_eq!(addresses.len(), 1);
385 assert_eq!(addresses[0], pool_address);
386 }
387
388 #[rstest]
389 fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
390 let pool_address = address!("1234567890123456789012345678901234567890");
391
392 manager.subscribe_swaps(DexType::UniswapV3, pool_address);
394 manager.subscribe_mints(DexType::UniswapV3, pool_address);
395 manager.subscribe_burns(DexType::UniswapV3, pool_address);
396
397 let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
399 assert_eq!(addresses.len(), 0);
400 }
401
402 #[rstest]
403 fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
404 let pool_address = address!("1234567890123456789012345678901234567890");
405
406 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
408
409 assert_eq!(
411 registered_manager
412 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
413 .len(),
414 1
415 );
416
417 registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
419
420 assert_eq!(
422 registered_manager
423 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
424 .len(),
425 0
426 );
427 }
428
429 #[rstest]
430 fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
431 let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
432 let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
433 let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
434
435 assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
437 assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
438 assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
439 }
440
441 #[rstest]
442 fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
443 let pool_address = address!("1234567890123456789012345678901234567890");
444
445 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
447 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
448
449 let addresses =
451 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
452 assert_eq!(addresses.len(), 1);
453 }
454
455 #[rstest]
456 fn test_get_combined_addresses_from_all_events(
457 mut registered_manager: DefiDataSubscriptionManager,
458 ) {
459 let pool1 = address!("1111111111111111111111111111111111111111");
460 let pool2 = address!("2222222222222222222222222222222222222222");
461 let pool3 = address!("3333333333333333333333333333333333333333");
462
463 registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
465 registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
466 registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
467
468 let addresses =
470 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
471 assert_eq!(addresses.len(), 3);
472 assert!(addresses.contains(&pool1));
473 assert!(addresses.contains(&pool2));
474 assert!(addresses.contains(&pool3));
475 }
476
477 #[rstest]
478 fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
479 let swap_sig = registered_manager
482 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
483 .unwrap();
484
485 assert!(swap_sig.starts_with("0x"));
487 assert_eq!(swap_sig.len(), 66); let hex_part = &swap_sig[2..];
491 assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
492 }
493
494 #[rstest]
495 #[case(DexType::UniswapV3)]
496 #[case(DexType::UniswapV2)]
497 fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
498 let mut manager = DefiDataSubscriptionManager::new();
499 let pool1 = address!("1111111111111111111111111111111111111111");
500 let pool2 = address!("2222222222222222222222222222222222222222");
501
502 manager.register_dex_for_subscriptions(
504 dex_type,
505 "Swap(address,uint256,uint256)",
506 "Mint(address,uint256)",
507 "Burn(address,uint256)",
508 "Collect(address,uint256,uint256)",
509 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
510 );
511
512 manager.subscribe_swaps(dex_type, pool1);
514 manager.subscribe_swaps(dex_type, pool2);
515 manager.subscribe_mints(dex_type, pool1);
516 manager.subscribe_burns(dex_type, pool2);
517
518 let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
520 assert_eq!(addresses.len(), 2);
521 assert!(addresses.contains(&pool1));
522 assert!(addresses.contains(&pool2));
523
524 let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
526 assert_eq!(signatures.len(), 5);
527
528 manager.unsubscribe_swaps(dex_type, pool1);
530 manager.unsubscribe_burns(dex_type, pool2);
531
532 let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
534 assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
537
538 #[rstest]
539 fn test_register_with_raw_signatures() {
540 let mut manager = DefiDataSubscriptionManager::new();
541
542 manager.register_dex_for_subscriptions(
544 DexType::UniswapV3,
545 "Swap(address,address,int256,int256,uint160,uint128,int24)",
546 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
547 "Burn(address,int24,int24,uint128,uint256,uint256)",
548 "Collect(address,address,int24,int24,uint128,uint128)",
549 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
550 );
551
552 let swap_sig = manager
554 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
555 .unwrap();
556 let mint_sig = manager
557 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
558 .unwrap();
559 let burn_sig = manager
560 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
561 .unwrap();
562
563 assert_eq!(
565 swap_sig,
566 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
567 );
568 assert_eq!(
569 mint_sig,
570 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
571 );
572 assert_eq!(
573 burn_sig,
574 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
575 );
576 }
577
578 #[rstest]
579 fn test_register_with_pre_encoded_signatures() {
580 let mut manager = DefiDataSubscriptionManager::new();
581
582 manager.register_dex_for_subscriptions(
584 DexType::UniswapV3,
585 "Swap(address,address,int256,int256,uint160,uint128,int24)",
586 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
587 "Burn(address,int24,int24,uint128,uint256,uint256)",
588 "Collect(address,address,int24,int24,uint128,uint128)",
589 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
590 );
591
592 let swap_sig = manager
594 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
595 .unwrap();
596 let mint_sig = manager
597 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
598 .unwrap();
599 let burn_sig = manager
600 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
601 .unwrap();
602
603 assert_eq!(
604 swap_sig,
605 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
606 );
607 assert_eq!(
608 mint_sig,
609 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
610 );
611 assert_eq!(
612 burn_sig,
613 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
614 );
615 }
616
617 #[rstest]
618 fn test_register_with_pre_encoded_signatures_no_prefix() {
619 let mut manager = DefiDataSubscriptionManager::new();
620
621 manager.register_dex_for_subscriptions(
623 DexType::UniswapV3,
624 "Swap(address,address,int256,int256,uint160,uint128,int24)",
625 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
626 "Burn(address,int24,int24,uint128,uint256,uint256)",
627 "Collect(address,address,int24,int24,uint128,uint128)",
628 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
629 );
630
631 let swap_sig = manager
633 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
634 .unwrap();
635 let mint_sig = manager
636 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
637 .unwrap();
638 let burn_sig = manager
639 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
640 .unwrap();
641
642 assert_eq!(
643 swap_sig,
644 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
645 );
646 assert_eq!(
647 mint_sig,
648 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
649 );
650 assert_eq!(
651 burn_sig,
652 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
653 );
654 }
655}