nautilus_blockchain/data/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20/// Manages subscriptions to DeFi protocol events (swaps, mints, burns, collects) across different DEXs.
21///
22/// This manager tracks which pool addresses are subscribed for each event type
23/// and maintains the event signature encodings for efficient filtering.
24#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26    pool_swap_event_encoded: AHashMap<DexType, String>,
27    pool_mint_event_encoded: AHashMap<DexType, String>,
28    pool_burn_event_encoded: AHashMap<DexType, String>,
29    pool_collect_event_encoded: AHashMap<DexType, String>,
30    pool_flash_event_encoded: AHashMap<DexType, String>,
31    subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32    subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33    subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34    subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35    subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl DefiDataSubscriptionManager {
45    /// Creates a new [`DefiDataSubscriptionManager`] instance.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            pool_swap_event_encoded: AHashMap::new(),
50            pool_burn_event_encoded: AHashMap::new(),
51            pool_mint_event_encoded: AHashMap::new(),
52            pool_collect_event_encoded: AHashMap::new(),
53            pool_flash_event_encoded: AHashMap::new(),
54            subscribed_pool_burns: AHashMap::new(),
55            subscribed_pool_mints: AHashMap::new(),
56            subscribed_pool_swaps: AHashMap::new(),
57            subscribed_pool_collects: AHashMap::new(),
58            subscribed_pool_flashes: AHashMap::new(),
59        }
60    }
61
62    /// Gets all unique contract addresses subscribed for any event type for a given DEX.
63    #[must_use]
64    pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65        let mut unique_addresses = AHashSet::new();
66
67        if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68            unique_addresses.extend(addresses.iter().copied());
69        }
70        if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71            unique_addresses.extend(addresses.iter().copied());
72        }
73        if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74            unique_addresses.extend(addresses.iter().copied());
75        }
76        if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77            unique_addresses.extend(addresses.iter().copied());
78        }
79        if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80            unique_addresses.extend(addresses.iter().copied());
81        }
82
83        unique_addresses.into_iter().collect()
84    }
85
86    /// Gets all event signatures (keccak256 hashes) registered for a given DEX.
87    #[must_use]
88    pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89        let mut result = Vec::new();
90
91        if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92            result.push(swap_event_signature.clone());
93        }
94        if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95            result.push(mint_event_signature.clone());
96        }
97        if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98            result.push(burn_event_signature.clone());
99        }
100        if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101            result.push(collect_event_signature.clone());
102        }
103        if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104            result.push(flash_event_signature.clone());
105        }
106
107        result
108    }
109
110    /// Gets the swap event signature for a specific DEX.
111    #[must_use]
112    pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113        self.pool_swap_event_encoded.get(dex).cloned()
114    }
115
116    /// Gets the mint event signature for a specific DEX.
117    #[must_use]
118    pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119        self.pool_mint_event_encoded.get(dex).cloned()
120    }
121    /// Gets the burn event signature for a specific DEX.
122    #[must_use]
123    pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124        self.pool_burn_event_encoded.get(dex).cloned()
125    }
126
127    /// Gets the collect event signature for a specific DEX.
128    #[must_use]
129    pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130        self.pool_collect_event_encoded.get(dex).cloned()
131    }
132
133    /// Normalizes an event signature to a consistent format.
134    ///
135    /// Accepts:
136    /// - A raw event signature like "Swap(address,address,int256,int256,uint160,uint128,int24)".
137    /// - A pre-encoded topic like "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67".
138    /// - A hex string without 0x prefix.
139    ///
140    /// Returns a normalized "0x..." format string.
141    fn normalize_topic(sig: &str) -> String {
142        let s = sig.trim();
143
144        // Check if it's already a properly formatted hex string with 0x prefix
145        if let Some(rest) = s.strip_prefix("0x")
146            && rest.len() == 64
147            && rest.chars().all(|c| c.is_ascii_hexdigit())
148        {
149            return format!("0x{}", rest.to_ascii_lowercase());
150        }
151
152        // Check if it's a hex string without 0x prefix
153        if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
154            return format!("0x{}", s.to_ascii_lowercase());
155        }
156
157        // Otherwise, it's a raw signature that needs hashing
158        format!("0x{}", hex::encode(keccak256(s.as_bytes())))
159    }
160
161    /// Registers a DEX with its event signatures for subscription management.
162    ///
163    /// This must be called before subscribing to any events for a DEX.
164    /// Event signatures can be either raw signatures or pre-encoded keccak256 hashes.
165    pub fn register_dex_for_subscriptions(
166        &mut self,
167        dex: DexType,
168        swap_event_signature: &str,
169        mint_event_signature: &str,
170        burn_event_signature: &str,
171        collect_event_signature: &str,
172        flash_event_signature: Option<&str>,
173    ) {
174        self.subscribed_pool_swaps.insert(dex, AHashSet::new());
175        self.pool_swap_event_encoded
176            .insert(dex, Self::normalize_topic(swap_event_signature));
177
178        self.subscribed_pool_mints.insert(dex, AHashSet::new());
179        self.pool_mint_event_encoded
180            .insert(dex, Self::normalize_topic(mint_event_signature));
181
182        self.subscribed_pool_burns.insert(dex, AHashSet::new());
183        self.pool_burn_event_encoded
184            .insert(dex, Self::normalize_topic(burn_event_signature));
185
186        self.subscribed_pool_collects.insert(dex, AHashSet::new());
187        self.pool_collect_event_encoded
188            .insert(dex, Self::normalize_topic(collect_event_signature));
189
190        if let Some(flash_event_signature) = flash_event_signature {
191            self.subscribed_pool_flashes.insert(dex, AHashSet::new());
192            self.pool_flash_event_encoded
193                .insert(dex, Self::normalize_topic(flash_event_signature));
194        }
195
196        tracing::info!("Registered DEX for subscriptions: {dex:?}");
197    }
198
199    /// Subscribes to swap events for a specific pool address on a DEX.
200    pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
201        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
202            pool_set.insert(address);
203        } else {
204            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
205        }
206    }
207
208    /// Subscribes to mint events for a specific pool address on a DEX.
209    pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
210        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
211            pool_set.insert(address);
212        } else {
213            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
214        }
215    }
216
217    /// Subscribes to burn events for a specific pool address on a DEX.
218    pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
219        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
220            pool_set.insert(address);
221        } else {
222            tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
223        }
224    }
225
226    /// Unsubscribes from swap events for a specific pool address on a DEX.
227    pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
228        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
229            pool_set.remove(&address);
230        } else {
231            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
232        }
233    }
234
235    /// Unsubscribes from mint events for a specific pool address on a DEX.
236    pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
237        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
238            pool_set.remove(&address);
239        } else {
240            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
241        }
242    }
243
244    /// Unsubscribes from burn events for a specific pool address on a DEX.
245    pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
246        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
247            pool_set.remove(&address);
248        } else {
249            tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
250        }
251    }
252
253    /// Subscribes to collect events for a specific pool address on a DEX.
254    pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
255        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
256            pool_set.insert(address);
257        } else {
258            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
259        }
260    }
261
262    /// Unsubscribes from collect events for a specific pool address on a DEX.
263    pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
264        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265            pool_set.remove(&address);
266        } else {
267            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
268        }
269    }
270
271    /// Subscribes to flash events for a specific pool address on a DEX.
272    pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
273        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
274            pool_set.insert(address);
275        } else {
276            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
277        }
278    }
279
280    /// Unsubscribes from flash events for a specific pool address on a DEX.
281    pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
282        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283            pool_set.remove(&address);
284        } else {
285            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use alloy::primitives::address;
293    use rstest::{fixture, rstest};
294
295    use super::*;
296
297    #[fixture]
298    fn manager() -> DefiDataSubscriptionManager {
299        DefiDataSubscriptionManager::new()
300    }
301
302    #[fixture]
303    fn registered_manager() -> DefiDataSubscriptionManager {
304        let mut manager = DefiDataSubscriptionManager::new();
305        manager.register_dex_for_subscriptions(
306            DexType::UniswapV3,
307            "Swap(address,address,int256,int256,uint160,uint128,int24)",
308            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
309            "Burn(address,int24,int24,uint128,uint256,uint256)",
310            "Collect(address,address,int24,int24,uint128,uint128)",
311            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
312        );
313        manager
314    }
315
316    #[rstest]
317    fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
318        assert_eq!(
319            manager
320                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
321                .len(),
322            0
323        );
324        assert_eq!(
325            manager
326                .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
327                .len(),
328            0
329        );
330        assert!(
331            manager
332                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
333                .is_none()
334        );
335        assert!(
336            manager
337                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
338                .is_none()
339        );
340        assert!(
341            manager
342                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
343                .is_none()
344        );
345    }
346
347    #[rstest]
348    fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
349        // Should have all four event signatures
350        let signatures =
351            registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
352        assert_eq!(signatures.len(), 5);
353
354        // Each signature should be properly encoded
355        assert!(
356            registered_manager
357                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
358                .is_some()
359        );
360        assert!(
361            registered_manager
362                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
363                .is_some()
364        );
365        assert!(
366            registered_manager
367                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
368                .is_some()
369        );
370    }
371
372    #[rstest]
373    fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
374        let pool_address = address!("1234567890123456789012345678901234567890");
375
376        // Subscribe to swap events
377        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
378
379        let addresses =
380            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
381        assert_eq!(addresses.len(), 1);
382        assert_eq!(addresses[0], pool_address);
383    }
384
385    #[rstest]
386    fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
387        let pool_address = address!("1234567890123456789012345678901234567890");
388
389        // Try to subscribe without registering - should log warning but not panic
390        manager.subscribe_swaps(DexType::UniswapV3, pool_address);
391        manager.subscribe_mints(DexType::UniswapV3, pool_address);
392        manager.subscribe_burns(DexType::UniswapV3, pool_address);
393
394        // Should return empty results
395        let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
396        assert_eq!(addresses.len(), 0);
397    }
398
399    #[rstest]
400    fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
401        let pool_address = address!("1234567890123456789012345678901234567890");
402
403        // Subscribe
404        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
405
406        // Verify subscription
407        assert_eq!(
408            registered_manager
409                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
410                .len(),
411            1
412        );
413
414        // Unsubscribe
415        registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
416
417        // Verify removal
418        assert_eq!(
419            registered_manager
420                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
421                .len(),
422            0
423        );
424    }
425
426    #[rstest]
427    fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
428        let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
429        let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
430        let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
431
432        // All should be Some and start with 0x
433        assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
434        assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
435        assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
436    }
437
438    #[rstest]
439    fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
440        let pool_address = address!("1234567890123456789012345678901234567890");
441
442        // Subscribe same address multiple times to same event type
443        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
444        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
445
446        // Should only appear once (HashSet behavior)
447        let addresses =
448            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
449        assert_eq!(addresses.len(), 1);
450    }
451
452    #[rstest]
453    fn test_get_combined_addresses_from_all_events(
454        mut registered_manager: DefiDataSubscriptionManager,
455    ) {
456        let pool1 = address!("1111111111111111111111111111111111111111");
457        let pool2 = address!("2222222222222222222222222222222222222222");
458        let pool3 = address!("3333333333333333333333333333333333333333");
459
460        // Subscribe different pools to different events
461        registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
462        registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
463        registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
464
465        // Should get all unique addresses
466        let addresses =
467            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
468        assert_eq!(addresses.len(), 3);
469        assert!(addresses.contains(&pool1));
470        assert!(addresses.contains(&pool2));
471        assert!(addresses.contains(&pool3));
472    }
473
474    #[rstest]
475    fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
476        // Known event signature and its expected keccak256 hash
477        // Swap(address,address,int256,int256,uint160,uint128,int24) for UniswapV3
478        let swap_sig = registered_manager
479            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
480            .unwrap();
481
482        // Should be properly formatted hex string
483        assert!(swap_sig.starts_with("0x"));
484        assert_eq!(swap_sig.len(), 66); // 0x + 64 hex chars (32 bytes)
485
486        // Verify it's valid hex
487        let hex_part = &swap_sig[2..];
488        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
489    }
490
491    #[rstest]
492    #[case(DexType::UniswapV3)]
493    #[case(DexType::UniswapV2)]
494    fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
495        let mut manager = DefiDataSubscriptionManager::new();
496        let pool1 = address!("1111111111111111111111111111111111111111");
497        let pool2 = address!("2222222222222222222222222222222222222222");
498
499        // Step 1: Register DEX
500        manager.register_dex_for_subscriptions(
501            dex_type,
502            "Swap(address,uint256,uint256)",
503            "Mint(address,uint256)",
504            "Burn(address,uint256)",
505            "Collect(address,uint256,uint256)",
506            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
507        );
508
509        // Step 2: Subscribe to events
510        manager.subscribe_swaps(dex_type, pool1);
511        manager.subscribe_swaps(dex_type, pool2);
512        manager.subscribe_mints(dex_type, pool1);
513        manager.subscribe_burns(dex_type, pool2);
514
515        // Step 3: Verify subscriptions
516        let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
517        assert_eq!(addresses.len(), 2);
518        assert!(addresses.contains(&pool1));
519        assert!(addresses.contains(&pool2));
520
521        // Step 4: Get event signatures
522        let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
523        assert_eq!(signatures.len(), 5);
524
525        // Step 5: Unsubscribe from some events
526        manager.unsubscribe_swaps(dex_type, pool1);
527        manager.unsubscribe_burns(dex_type, pool2);
528
529        // Step 6: Verify remaining subscriptions (only pool1 mint remains)
530        let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
531        assert!(remaining.contains(&pool1)); // Still has mint subscription
532        assert!(remaining.contains(&pool2)); // Still has swap subscription
533    }
534
535    #[rstest]
536    fn test_register_with_raw_signatures() {
537        let mut manager = DefiDataSubscriptionManager::new();
538
539        // Register with raw event signatures
540        manager.register_dex_for_subscriptions(
541            DexType::UniswapV3,
542            "Swap(address,address,int256,int256,uint160,uint128,int24)",
543            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
544            "Burn(address,int24,int24,uint128,uint256,uint256)",
545            "Collect(address,address,int24,int24,uint128,uint128)",
546            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
547        );
548
549        // Known keccak256 hashes for UniswapV3 events
550        let swap_sig = manager
551            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
552            .unwrap();
553        let mint_sig = manager
554            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
555            .unwrap();
556        let burn_sig = manager
557            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
558            .unwrap();
559
560        // Verify the exact hash values
561        assert_eq!(
562            swap_sig,
563            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
564        );
565        assert_eq!(
566            mint_sig,
567            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
568        );
569        assert_eq!(
570            burn_sig,
571            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
572        );
573    }
574
575    #[rstest]
576    fn test_register_with_pre_encoded_signatures() {
577        let mut manager = DefiDataSubscriptionManager::new();
578
579        // Register with pre-encoded keccak256 hashes (with 0x prefix)
580        manager.register_dex_for_subscriptions(
581            DexType::UniswapV3,
582            "Swap(address,address,int256,int256,uint160,uint128,int24)",
583            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
584            "Burn(address,int24,int24,uint128,uint256,uint256)",
585            "Collect(address,address,int24,int24,uint128,uint128)",
586            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
587        );
588
589        // Should store them unchanged (normalized to lowercase)
590        let swap_sig = manager
591            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
592            .unwrap();
593        let mint_sig = manager
594            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
595            .unwrap();
596        let burn_sig = manager
597            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
598            .unwrap();
599
600        assert_eq!(
601            swap_sig,
602            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
603        );
604        assert_eq!(
605            mint_sig,
606            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
607        );
608        assert_eq!(
609            burn_sig,
610            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
611        );
612    }
613
614    #[rstest]
615    fn test_register_with_pre_encoded_signatures_no_prefix() {
616        let mut manager = DefiDataSubscriptionManager::new();
617
618        // Register with pre-encoded hashes without 0x prefix
619        manager.register_dex_for_subscriptions(
620            DexType::UniswapV3,
621            "Swap(address,address,int256,int256,uint160,uint128,int24)",
622            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
623            "Burn(address,int24,int24,uint128,uint256,uint256)",
624            "Collect(address,address,int24,int24,uint128,uint128)",
625            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
626        );
627
628        // Should add 0x prefix and normalize to lowercase
629        let swap_sig = manager
630            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
631            .unwrap();
632        let mint_sig = manager
633            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
634            .unwrap();
635        let burn_sig = manager
636            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
637            .unwrap();
638
639        assert_eq!(
640            swap_sig,
641            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
642        );
643        assert_eq!(
644            mint_sig,
645            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
646        );
647        assert_eq!(
648            burn_sig,
649            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
650        );
651    }
652}