nautilus_blockchain/data/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20/// Manages subscriptions to DeFi protocol events (swaps, mints, burns, collects) across different DEXs.
21///
22/// This manager tracks which pool addresses are subscribed for each event type
23/// and maintains the event signature encodings for efficient filtering.
24#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26    pool_swap_event_encoded: AHashMap<DexType, String>,
27    pool_mint_event_encoded: AHashMap<DexType, String>,
28    pool_burn_event_encoded: AHashMap<DexType, String>,
29    pool_collect_event_encoded: AHashMap<DexType, String>,
30    pool_flash_event_encoded: AHashMap<DexType, String>,
31    subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32    subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33    subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34    subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35    subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl DefiDataSubscriptionManager {
45    /// Creates a new [`DefiDataSubscriptionManager`] instance.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            pool_swap_event_encoded: AHashMap::new(),
50            pool_burn_event_encoded: AHashMap::new(),
51            pool_mint_event_encoded: AHashMap::new(),
52            pool_collect_event_encoded: AHashMap::new(),
53            pool_flash_event_encoded: AHashMap::new(),
54            subscribed_pool_burns: AHashMap::new(),
55            subscribed_pool_mints: AHashMap::new(),
56            subscribed_pool_swaps: AHashMap::new(),
57            subscribed_pool_collects: AHashMap::new(),
58            subscribed_pool_flashes: AHashMap::new(),
59        }
60    }
61
62    /// Gets all unique contract addresses subscribed for any event type for a given DEX.
63    #[must_use]
64    pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65        let mut unique_addresses = AHashSet::new();
66
67        if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68            unique_addresses.extend(addresses.iter().copied());
69        }
70        if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71            unique_addresses.extend(addresses.iter().copied());
72        }
73        if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74            unique_addresses.extend(addresses.iter().copied());
75        }
76        if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77            unique_addresses.extend(addresses.iter().copied());
78        }
79        if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80            unique_addresses.extend(addresses.iter().copied());
81        }
82
83        unique_addresses.into_iter().collect()
84    }
85
86    /// Gets all event signatures (keccak256 hashes) registered for a given DEX.
87    #[must_use]
88    pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89        let mut result = Vec::new();
90
91        if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92            result.push(swap_event_signature.clone());
93        }
94        if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95            result.push(mint_event_signature.clone());
96        }
97        if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98            result.push(burn_event_signature.clone());
99        }
100        if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101            result.push(collect_event_signature.clone());
102        }
103        if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104            result.push(flash_event_signature.clone());
105        }
106
107        result
108    }
109
110    /// Gets the swap event signature for a specific DEX.
111    #[must_use]
112    pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113        self.pool_swap_event_encoded.get(dex).cloned()
114    }
115
116    /// Gets the mint event signature for a specific DEX.
117    #[must_use]
118    pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119        self.pool_mint_event_encoded.get(dex).cloned()
120    }
121    /// Gets the burn event signature for a specific DEX.
122    #[must_use]
123    pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124        self.pool_burn_event_encoded.get(dex).cloned()
125    }
126
127    /// Gets the collect event signature for a specific DEX.
128    #[must_use]
129    pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130        self.pool_collect_event_encoded.get(dex).cloned()
131    }
132
133    /// Normalizes an event signature to a consistent format.
134    ///
135    /// Accepts:
136    /// - A raw event signature like "Swap(address,address,int256,int256,uint160,uint128,int24)".
137    /// - A pre-encoded topic like "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67".
138    /// - A hex string without 0x prefix.
139    ///
140    /// Returns a normalized "0x..." format string.
141    fn normalize_topic(sig: &str) -> String {
142        let s = sig.trim();
143
144        // Check if it's already a properly formatted hex string with 0x prefix
145        if let Some(rest) = s.strip_prefix("0x") {
146            if rest.len() == 64 && rest.chars().all(|c| c.is_ascii_hexdigit()) {
147                return format!("0x{}", rest.to_ascii_lowercase());
148            }
149        }
150
151        // Check if it's a hex string without 0x prefix
152        if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
153            return format!("0x{}", s.to_ascii_lowercase());
154        }
155
156        // Otherwise, it's a raw signature that needs hashing
157        format!("0x{}", hex::encode(keccak256(s.as_bytes())))
158    }
159
160    /// Registers a DEX with its event signatures for subscription management.
161    ///
162    /// This must be called before subscribing to any events for a DEX.
163    /// Event signatures can be either raw signatures or pre-encoded keccak256 hashes.
164    pub fn register_dex_for_subscriptions(
165        &mut self,
166        dex: DexType,
167        swap_event_signature: &str,
168        mint_event_signature: &str,
169        burn_event_signature: &str,
170        collect_event_signature: &str,
171        flash_event_signature: Option<&str>,
172    ) {
173        self.subscribed_pool_swaps.insert(dex, AHashSet::new());
174        self.pool_swap_event_encoded
175            .insert(dex, Self::normalize_topic(swap_event_signature));
176
177        self.subscribed_pool_mints.insert(dex, AHashSet::new());
178        self.pool_mint_event_encoded
179            .insert(dex, Self::normalize_topic(mint_event_signature));
180
181        self.subscribed_pool_burns.insert(dex, AHashSet::new());
182        self.pool_burn_event_encoded
183            .insert(dex, Self::normalize_topic(burn_event_signature));
184
185        self.subscribed_pool_collects.insert(dex, AHashSet::new());
186        self.pool_collect_event_encoded
187            .insert(dex, Self::normalize_topic(collect_event_signature));
188
189        if let Some(flash_event_signature) = flash_event_signature {
190            self.subscribed_pool_flashes.insert(dex, AHashSet::new());
191            self.pool_flash_event_encoded
192                .insert(dex, Self::normalize_topic(flash_event_signature));
193        }
194
195        tracing::info!("Registered DEX for subscriptions: {dex:?}");
196    }
197
198    /// Subscribes to swap events for a specific pool address on a DEX.
199    pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
200        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
201            pool_set.insert(address);
202        } else {
203            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
204        }
205    }
206
207    /// Subscribes to mint events for a specific pool address on a DEX.
208    pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
209        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
210            pool_set.insert(address);
211        } else {
212            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
213        }
214    }
215
216    /// Subscribes to burn events for a specific pool address on a DEX.
217    pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
218        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
219            pool_set.insert(address);
220        } else {
221            tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
222        }
223    }
224
225    /// Unsubscribes from swap events for a specific pool address on a DEX.
226    pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
227        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
228            pool_set.remove(&address);
229        } else {
230            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
231        }
232    }
233
234    /// Unsubscribes from mint events for a specific pool address on a DEX.
235    pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
236        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
237            pool_set.remove(&address);
238        } else {
239            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
240        }
241    }
242
243    /// Unsubscribes from burn events for a specific pool address on a DEX.
244    pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
245        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
246            pool_set.remove(&address);
247        } else {
248            tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
249        }
250    }
251
252    /// Subscribes to collect events for a specific pool address on a DEX.
253    pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
254        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
255            pool_set.insert(address);
256        } else {
257            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
258        }
259    }
260
261    /// Unsubscribes from collect events for a specific pool address on a DEX.
262    pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
263        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
264            pool_set.remove(&address);
265        } else {
266            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
267        }
268    }
269
270    /// Subscribes to flash events for a specific pool address on a DEX.
271    pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
272        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
273            pool_set.insert(address);
274        } else {
275            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
276        }
277    }
278
279    /// Unsubscribes from flash events for a specific pool address on a DEX.
280    pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
281        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
282            pool_set.remove(&address);
283        } else {
284            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
285        }
286    }
287}
288
289////////////////////////////////////////////////////////////////////////////////
290// Tests
291////////////////////////////////////////////////////////////////////////////////
292
293#[cfg(test)]
294mod tests {
295    use alloy::primitives::address;
296    use rstest::{fixture, rstest};
297
298    use super::*;
299
300    #[fixture]
301    fn manager() -> DefiDataSubscriptionManager {
302        DefiDataSubscriptionManager::new()
303    }
304
305    #[fixture]
306    fn registered_manager() -> DefiDataSubscriptionManager {
307        let mut manager = DefiDataSubscriptionManager::new();
308        manager.register_dex_for_subscriptions(
309            DexType::UniswapV3,
310            "Swap(address,address,int256,int256,uint160,uint128,int24)",
311            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
312            "Burn(address,int24,int24,uint128,uint256,uint256)",
313            "Collect(address,address,int24,int24,uint128,uint128)",
314            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
315        );
316        manager
317    }
318
319    #[rstest]
320    fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
321        assert_eq!(
322            manager
323                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
324                .len(),
325            0
326        );
327        assert_eq!(
328            manager
329                .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
330                .len(),
331            0
332        );
333        assert!(
334            manager
335                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
336                .is_none()
337        );
338        assert!(
339            manager
340                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
341                .is_none()
342        );
343        assert!(
344            manager
345                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
346                .is_none()
347        );
348    }
349
350    #[rstest]
351    fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
352        // Should have all four event signatures
353        let signatures =
354            registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
355        assert_eq!(signatures.len(), 5);
356
357        // Each signature should be properly encoded
358        assert!(
359            registered_manager
360                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
361                .is_some()
362        );
363        assert!(
364            registered_manager
365                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
366                .is_some()
367        );
368        assert!(
369            registered_manager
370                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
371                .is_some()
372        );
373    }
374
375    #[rstest]
376    fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
377        let pool_address = address!("1234567890123456789012345678901234567890");
378
379        // Subscribe to swap events
380        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
381
382        let addresses =
383            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
384        assert_eq!(addresses.len(), 1);
385        assert_eq!(addresses[0], pool_address);
386    }
387
388    #[rstest]
389    fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
390        let pool_address = address!("1234567890123456789012345678901234567890");
391
392        // Try to subscribe without registering - should log warning but not panic
393        manager.subscribe_swaps(DexType::UniswapV3, pool_address);
394        manager.subscribe_mints(DexType::UniswapV3, pool_address);
395        manager.subscribe_burns(DexType::UniswapV3, pool_address);
396
397        // Should return empty results
398        let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
399        assert_eq!(addresses.len(), 0);
400    }
401
402    #[rstest]
403    fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
404        let pool_address = address!("1234567890123456789012345678901234567890");
405
406        // Subscribe
407        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
408
409        // Verify subscription
410        assert_eq!(
411            registered_manager
412                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
413                .len(),
414            1
415        );
416
417        // Unsubscribe
418        registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
419
420        // Verify removal
421        assert_eq!(
422            registered_manager
423                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
424                .len(),
425            0
426        );
427    }
428
429    #[rstest]
430    fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
431        let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
432        let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
433        let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
434
435        // All should be Some and start with 0x
436        assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
437        assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
438        assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
439    }
440
441    #[rstest]
442    fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
443        let pool_address = address!("1234567890123456789012345678901234567890");
444
445        // Subscribe same address multiple times to same event type
446        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
447        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
448
449        // Should only appear once (HashSet behavior)
450        let addresses =
451            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
452        assert_eq!(addresses.len(), 1);
453    }
454
455    #[rstest]
456    fn test_get_combined_addresses_from_all_events(
457        mut registered_manager: DefiDataSubscriptionManager,
458    ) {
459        let pool1 = address!("1111111111111111111111111111111111111111");
460        let pool2 = address!("2222222222222222222222222222222222222222");
461        let pool3 = address!("3333333333333333333333333333333333333333");
462
463        // Subscribe different pools to different events
464        registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
465        registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
466        registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
467
468        // Should get all unique addresses
469        let addresses =
470            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
471        assert_eq!(addresses.len(), 3);
472        assert!(addresses.contains(&pool1));
473        assert!(addresses.contains(&pool2));
474        assert!(addresses.contains(&pool3));
475    }
476
477    #[rstest]
478    fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
479        // Known event signature and its expected keccak256 hash
480        // Swap(address,address,int256,int256,uint160,uint128,int24) for UniswapV3
481        let swap_sig = registered_manager
482            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
483            .unwrap();
484
485        // Should be properly formatted hex string
486        assert!(swap_sig.starts_with("0x"));
487        assert_eq!(swap_sig.len(), 66); // 0x + 64 hex chars (32 bytes)
488
489        // Verify it's valid hex
490        let hex_part = &swap_sig[2..];
491        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
492    }
493
494    #[rstest]
495    #[case(DexType::UniswapV3)]
496    #[case(DexType::UniswapV2)]
497    fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
498        let mut manager = DefiDataSubscriptionManager::new();
499        let pool1 = address!("1111111111111111111111111111111111111111");
500        let pool2 = address!("2222222222222222222222222222222222222222");
501
502        // Step 1: Register DEX
503        manager.register_dex_for_subscriptions(
504            dex_type,
505            "Swap(address,uint256,uint256)",
506            "Mint(address,uint256)",
507            "Burn(address,uint256)",
508            "Collect(address,uint256,uint256)",
509            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
510        );
511
512        // Step 2: Subscribe to events
513        manager.subscribe_swaps(dex_type, pool1);
514        manager.subscribe_swaps(dex_type, pool2);
515        manager.subscribe_mints(dex_type, pool1);
516        manager.subscribe_burns(dex_type, pool2);
517
518        // Step 3: Verify subscriptions
519        let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
520        assert_eq!(addresses.len(), 2);
521        assert!(addresses.contains(&pool1));
522        assert!(addresses.contains(&pool2));
523
524        // Step 4: Get event signatures
525        let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
526        assert_eq!(signatures.len(), 5);
527
528        // Step 5: Unsubscribe from some events
529        manager.unsubscribe_swaps(dex_type, pool1);
530        manager.unsubscribe_burns(dex_type, pool2);
531
532        // Step 6: Verify remaining subscriptions (only pool1 mint remains)
533        let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
534        assert!(remaining.contains(&pool1)); // Still has mint subscription
535        assert!(remaining.contains(&pool2)); // Still has swap subscription
536    }
537
538    #[rstest]
539    fn test_register_with_raw_signatures() {
540        let mut manager = DefiDataSubscriptionManager::new();
541
542        // Register with raw event signatures
543        manager.register_dex_for_subscriptions(
544            DexType::UniswapV3,
545            "Swap(address,address,int256,int256,uint160,uint128,int24)",
546            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
547            "Burn(address,int24,int24,uint128,uint256,uint256)",
548            "Collect(address,address,int24,int24,uint128,uint128)",
549            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
550        );
551
552        // Known keccak256 hashes for UniswapV3 events
553        let swap_sig = manager
554            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
555            .unwrap();
556        let mint_sig = manager
557            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
558            .unwrap();
559        let burn_sig = manager
560            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
561            .unwrap();
562
563        // Verify the exact hash values
564        assert_eq!(
565            swap_sig,
566            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
567        );
568        assert_eq!(
569            mint_sig,
570            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
571        );
572        assert_eq!(
573            burn_sig,
574            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
575        );
576    }
577
578    #[rstest]
579    fn test_register_with_pre_encoded_signatures() {
580        let mut manager = DefiDataSubscriptionManager::new();
581
582        // Register with pre-encoded keccak256 hashes (with 0x prefix)
583        manager.register_dex_for_subscriptions(
584            DexType::UniswapV3,
585            "Swap(address,address,int256,int256,uint160,uint128,int24)",
586            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
587            "Burn(address,int24,int24,uint128,uint256,uint256)",
588            "Collect(address,address,int24,int24,uint128,uint128)",
589            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
590        );
591
592        // Should store them unchanged (normalized to lowercase)
593        let swap_sig = manager
594            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
595            .unwrap();
596        let mint_sig = manager
597            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
598            .unwrap();
599        let burn_sig = manager
600            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
601            .unwrap();
602
603        assert_eq!(
604            swap_sig,
605            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
606        );
607        assert_eq!(
608            mint_sig,
609            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
610        );
611        assert_eq!(
612            burn_sig,
613            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
614        );
615    }
616
617    #[rstest]
618    fn test_register_with_pre_encoded_signatures_no_prefix() {
619        let mut manager = DefiDataSubscriptionManager::new();
620
621        // Register with pre-encoded hashes without 0x prefix
622        manager.register_dex_for_subscriptions(
623            DexType::UniswapV3,
624            "Swap(address,address,int256,int256,uint160,uint128,int24)",
625            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
626            "Burn(address,int24,int24,uint128,uint256,uint256)",
627            "Collect(address,address,int24,int24,uint128,uint128)",
628            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
629        );
630
631        // Should add 0x prefix and normalize to lowercase
632        let swap_sig = manager
633            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
634            .unwrap();
635        let mint_sig = manager
636            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
637            .unwrap();
638        let burn_sig = manager
639            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
640            .unwrap();
641
642        assert_eq!(
643            swap_sig,
644            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
645        );
646        assert_eq!(
647            mint_sig,
648            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
649        );
650        assert_eq!(
651            burn_sig,
652            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
653        );
654    }
655}