nautilus_blockchain/data/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20/// Manages subscriptions to DeFi protocol events (swaps, mints, burns, collects) across different DEXs.
21///
22/// This manager tracks which pool addresses are subscribed for each event type
23/// and maintains the event signature encodings for efficient filtering.
24#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26    pool_swap_event_encoded: AHashMap<DexType, String>,
27    pool_mint_event_encoded: AHashMap<DexType, String>,
28    pool_burn_event_encoded: AHashMap<DexType, String>,
29    pool_collect_event_encoded: AHashMap<DexType, String>,
30    pool_flash_event_encoded: AHashMap<DexType, String>,
31    subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
32    subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
33    subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
34    subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
35    subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
36}
37
38impl Default for DefiDataSubscriptionManager {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl DefiDataSubscriptionManager {
45    /// Creates a new [`DefiDataSubscriptionManager`] instance.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            pool_swap_event_encoded: AHashMap::new(),
50            pool_burn_event_encoded: AHashMap::new(),
51            pool_mint_event_encoded: AHashMap::new(),
52            pool_collect_event_encoded: AHashMap::new(),
53            pool_flash_event_encoded: AHashMap::new(),
54            subscribed_pool_burns: AHashMap::new(),
55            subscribed_pool_mints: AHashMap::new(),
56            subscribed_pool_swaps: AHashMap::new(),
57            subscribed_pool_collects: AHashMap::new(),
58            subscribed_pool_flashes: AHashMap::new(),
59        }
60    }
61
62    /// Gets all unique contract addresses subscribed for any event type for a given DEX.
63    #[must_use]
64    pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
65        let mut unique_addresses = AHashSet::new();
66
67        if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
68            unique_addresses.extend(addresses.iter().copied());
69        }
70        if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
71            unique_addresses.extend(addresses.iter().copied());
72        }
73        if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
74            unique_addresses.extend(addresses.iter().copied());
75        }
76        if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
77            unique_addresses.extend(addresses.iter().copied());
78        }
79        if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
80            unique_addresses.extend(addresses.iter().copied());
81        }
82
83        unique_addresses.into_iter().collect()
84    }
85
86    /// Gets all event signatures (keccak256 hashes) registered for a given DEX.
87    #[must_use]
88    pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
89        let mut result = Vec::new();
90
91        if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
92            result.push(swap_event_signature.clone());
93        }
94        if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
95            result.push(mint_event_signature.clone());
96        }
97        if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
98            result.push(burn_event_signature.clone());
99        }
100        if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
101            result.push(collect_event_signature.clone());
102        }
103        if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
104            result.push(flash_event_signature.clone());
105        }
106
107        result
108    }
109
110    /// Gets the swap event signature for a specific DEX.
111    #[must_use]
112    pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
113        self.pool_swap_event_encoded.get(dex).cloned()
114    }
115
116    /// Gets the mint event signature for a specific DEX.
117    #[must_use]
118    pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
119        self.pool_mint_event_encoded.get(dex).cloned()
120    }
121    /// Gets the burn event signature for a specific DEX.
122    #[must_use]
123    pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
124        self.pool_burn_event_encoded.get(dex).cloned()
125    }
126
127    /// Gets the collect event signature for a specific DEX.
128    #[must_use]
129    pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
130        self.pool_collect_event_encoded.get(dex).cloned()
131    }
132
133    /// Normalizes an event signature to a consistent format.
134    ///
135    /// Accepts:
136    /// - A raw event signature like "Swap(address,address,int256,int256,uint160,uint128,int24)".
137    /// - A pre-encoded topic like "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67".
138    /// - A hex string without 0x prefix.
139    ///
140    /// Returns a normalized "0x..." format string.
141    fn normalize_topic(sig: &str) -> String {
142        let s = sig.trim();
143
144        // Check if it's already a properly formatted hex string with 0x prefix
145        if let Some(rest) = s.strip_prefix("0x")
146            && rest.len() == 64
147            && rest.chars().all(|c| c.is_ascii_hexdigit())
148        {
149            return format!("0x{}", rest.to_ascii_lowercase());
150        }
151
152        // Check if it's a hex string without 0x prefix
153        if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
154            return format!("0x{}", s.to_ascii_lowercase());
155        }
156
157        // Otherwise, it's a raw signature that needs hashing
158        format!("0x{}", hex::encode(keccak256(s.as_bytes())))
159    }
160
161    /// Registers a DEX with its event signatures for subscription management.
162    ///
163    /// This must be called before subscribing to any events for a DEX.
164    /// Event signatures can be either raw signatures or pre-encoded keccak256 hashes.
165    pub fn register_dex_for_subscriptions(
166        &mut self,
167        dex: DexType,
168        swap_event_signature: &str,
169        mint_event_signature: &str,
170        burn_event_signature: &str,
171        collect_event_signature: &str,
172        flash_event_signature: Option<&str>,
173    ) {
174        self.subscribed_pool_swaps.insert(dex, AHashSet::new());
175        self.pool_swap_event_encoded
176            .insert(dex, Self::normalize_topic(swap_event_signature));
177
178        self.subscribed_pool_mints.insert(dex, AHashSet::new());
179        self.pool_mint_event_encoded
180            .insert(dex, Self::normalize_topic(mint_event_signature));
181
182        self.subscribed_pool_burns.insert(dex, AHashSet::new());
183        self.pool_burn_event_encoded
184            .insert(dex, Self::normalize_topic(burn_event_signature));
185
186        self.subscribed_pool_collects.insert(dex, AHashSet::new());
187        self.pool_collect_event_encoded
188            .insert(dex, Self::normalize_topic(collect_event_signature));
189
190        if let Some(flash_event_signature) = flash_event_signature {
191            self.subscribed_pool_flashes.insert(dex, AHashSet::new());
192            self.pool_flash_event_encoded
193                .insert(dex, Self::normalize_topic(flash_event_signature));
194        }
195
196        tracing::info!("Registered DEX for subscriptions: {dex:?}");
197    }
198
199    /// Subscribes to swap events for a specific pool address on a DEX.
200    pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
201        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
202            pool_set.insert(address);
203        } else {
204            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
205        }
206    }
207
208    /// Subscribes to mint events for a specific pool address on a DEX.
209    pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
210        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
211            pool_set.insert(address);
212        } else {
213            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
214        }
215    }
216
217    /// Subscribes to burn events for a specific pool address on a DEX.
218    pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
219        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
220            pool_set.insert(address);
221        } else {
222            tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
223        }
224    }
225
226    /// Unsubscribes from swap events for a specific pool address on a DEX.
227    pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
228        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
229            pool_set.remove(&address);
230        } else {
231            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
232        }
233    }
234
235    /// Unsubscribes from mint events for a specific pool address on a DEX.
236    pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
237        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
238            pool_set.remove(&address);
239        } else {
240            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
241        }
242    }
243
244    /// Unsubscribes from burn events for a specific pool address on a DEX.
245    pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
246        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
247            pool_set.remove(&address);
248        } else {
249            tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
250        }
251    }
252
253    /// Subscribes to collect events for a specific pool address on a DEX.
254    pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
255        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
256            pool_set.insert(address);
257        } else {
258            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
259        }
260    }
261
262    /// Unsubscribes from collect events for a specific pool address on a DEX.
263    pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
264        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265            pool_set.remove(&address);
266        } else {
267            tracing::error!("DEX not registered for collect subscriptions: {dex:?}");
268        }
269    }
270
271    /// Subscribes to flash events for a specific pool address on a DEX.
272    pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
273        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
274            pool_set.insert(address);
275        } else {
276            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
277        }
278    }
279
280    /// Unsubscribes from flash events for a specific pool address on a DEX.
281    pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
282        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283            pool_set.remove(&address);
284        } else {
285            tracing::error!("DEX not registered for flash subscriptions: {dex:?}");
286        }
287    }
288}
289
290////////////////////////////////////////////////////////////////////////////////
291// Tests
292////////////////////////////////////////////////////////////////////////////////
293
294#[cfg(test)]
295mod tests {
296    use alloy::primitives::address;
297    use rstest::{fixture, rstest};
298
299    use super::*;
300
301    #[fixture]
302    fn manager() -> DefiDataSubscriptionManager {
303        DefiDataSubscriptionManager::new()
304    }
305
306    #[fixture]
307    fn registered_manager() -> DefiDataSubscriptionManager {
308        let mut manager = DefiDataSubscriptionManager::new();
309        manager.register_dex_for_subscriptions(
310            DexType::UniswapV3,
311            "Swap(address,address,int256,int256,uint160,uint128,int24)",
312            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
313            "Burn(address,int24,int24,uint128,uint256,uint256)",
314            "Collect(address,address,int24,int24,uint128,uint128)",
315            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
316        );
317        manager
318    }
319
320    #[rstest]
321    fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
322        assert_eq!(
323            manager
324                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
325                .len(),
326            0
327        );
328        assert_eq!(
329            manager
330                .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
331                .len(),
332            0
333        );
334        assert!(
335            manager
336                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
337                .is_none()
338        );
339        assert!(
340            manager
341                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
342                .is_none()
343        );
344        assert!(
345            manager
346                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
347                .is_none()
348        );
349    }
350
351    #[rstest]
352    fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
353        // Should have all four event signatures
354        let signatures =
355            registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
356        assert_eq!(signatures.len(), 5);
357
358        // Each signature should be properly encoded
359        assert!(
360            registered_manager
361                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
362                .is_some()
363        );
364        assert!(
365            registered_manager
366                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
367                .is_some()
368        );
369        assert!(
370            registered_manager
371                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
372                .is_some()
373        );
374    }
375
376    #[rstest]
377    fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
378        let pool_address = address!("1234567890123456789012345678901234567890");
379
380        // Subscribe to swap events
381        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
382
383        let addresses =
384            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
385        assert_eq!(addresses.len(), 1);
386        assert_eq!(addresses[0], pool_address);
387    }
388
389    #[rstest]
390    fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
391        let pool_address = address!("1234567890123456789012345678901234567890");
392
393        // Try to subscribe without registering - should log warning but not panic
394        manager.subscribe_swaps(DexType::UniswapV3, pool_address);
395        manager.subscribe_mints(DexType::UniswapV3, pool_address);
396        manager.subscribe_burns(DexType::UniswapV3, pool_address);
397
398        // Should return empty results
399        let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
400        assert_eq!(addresses.len(), 0);
401    }
402
403    #[rstest]
404    fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
405        let pool_address = address!("1234567890123456789012345678901234567890");
406
407        // Subscribe
408        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
409
410        // Verify subscription
411        assert_eq!(
412            registered_manager
413                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
414                .len(),
415            1
416        );
417
418        // Unsubscribe
419        registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
420
421        // Verify removal
422        assert_eq!(
423            registered_manager
424                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
425                .len(),
426            0
427        );
428    }
429
430    #[rstest]
431    fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
432        let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
433        let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
434        let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
435
436        // All should be Some and start with 0x
437        assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
438        assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
439        assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
440    }
441
442    #[rstest]
443    fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
444        let pool_address = address!("1234567890123456789012345678901234567890");
445
446        // Subscribe same address multiple times to same event type
447        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
448        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
449
450        // Should only appear once (HashSet behavior)
451        let addresses =
452            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
453        assert_eq!(addresses.len(), 1);
454    }
455
456    #[rstest]
457    fn test_get_combined_addresses_from_all_events(
458        mut registered_manager: DefiDataSubscriptionManager,
459    ) {
460        let pool1 = address!("1111111111111111111111111111111111111111");
461        let pool2 = address!("2222222222222222222222222222222222222222");
462        let pool3 = address!("3333333333333333333333333333333333333333");
463
464        // Subscribe different pools to different events
465        registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
466        registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
467        registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
468
469        // Should get all unique addresses
470        let addresses =
471            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
472        assert_eq!(addresses.len(), 3);
473        assert!(addresses.contains(&pool1));
474        assert!(addresses.contains(&pool2));
475        assert!(addresses.contains(&pool3));
476    }
477
478    #[rstest]
479    fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
480        // Known event signature and its expected keccak256 hash
481        // Swap(address,address,int256,int256,uint160,uint128,int24) for UniswapV3
482        let swap_sig = registered_manager
483            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
484            .unwrap();
485
486        // Should be properly formatted hex string
487        assert!(swap_sig.starts_with("0x"));
488        assert_eq!(swap_sig.len(), 66); // 0x + 64 hex chars (32 bytes)
489
490        // Verify it's valid hex
491        let hex_part = &swap_sig[2..];
492        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
493    }
494
495    #[rstest]
496    #[case(DexType::UniswapV3)]
497    #[case(DexType::UniswapV2)]
498    fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
499        let mut manager = DefiDataSubscriptionManager::new();
500        let pool1 = address!("1111111111111111111111111111111111111111");
501        let pool2 = address!("2222222222222222222222222222222222222222");
502
503        // Step 1: Register DEX
504        manager.register_dex_for_subscriptions(
505            dex_type,
506            "Swap(address,uint256,uint256)",
507            "Mint(address,uint256)",
508            "Burn(address,uint256)",
509            "Collect(address,uint256,uint256)",
510            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
511        );
512
513        // Step 2: Subscribe to events
514        manager.subscribe_swaps(dex_type, pool1);
515        manager.subscribe_swaps(dex_type, pool2);
516        manager.subscribe_mints(dex_type, pool1);
517        manager.subscribe_burns(dex_type, pool2);
518
519        // Step 3: Verify subscriptions
520        let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
521        assert_eq!(addresses.len(), 2);
522        assert!(addresses.contains(&pool1));
523        assert!(addresses.contains(&pool2));
524
525        // Step 4: Get event signatures
526        let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
527        assert_eq!(signatures.len(), 5);
528
529        // Step 5: Unsubscribe from some events
530        manager.unsubscribe_swaps(dex_type, pool1);
531        manager.unsubscribe_burns(dex_type, pool2);
532
533        // Step 6: Verify remaining subscriptions (only pool1 mint remains)
534        let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
535        assert!(remaining.contains(&pool1)); // Still has mint subscription
536        assert!(remaining.contains(&pool2)); // Still has swap subscription
537    }
538
539    #[rstest]
540    fn test_register_with_raw_signatures() {
541        let mut manager = DefiDataSubscriptionManager::new();
542
543        // Register with raw event signatures
544        manager.register_dex_for_subscriptions(
545            DexType::UniswapV3,
546            "Swap(address,address,int256,int256,uint160,uint128,int24)",
547            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
548            "Burn(address,int24,int24,uint128,uint256,uint256)",
549            "Collect(address,address,int24,int24,uint128,uint128)",
550            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
551        );
552
553        // Known keccak256 hashes for UniswapV3 events
554        let swap_sig = manager
555            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
556            .unwrap();
557        let mint_sig = manager
558            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
559            .unwrap();
560        let burn_sig = manager
561            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
562            .unwrap();
563
564        // Verify the exact hash values
565        assert_eq!(
566            swap_sig,
567            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
568        );
569        assert_eq!(
570            mint_sig,
571            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
572        );
573        assert_eq!(
574            burn_sig,
575            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
576        );
577    }
578
579    #[rstest]
580    fn test_register_with_pre_encoded_signatures() {
581        let mut manager = DefiDataSubscriptionManager::new();
582
583        // Register with pre-encoded keccak256 hashes (with 0x prefix)
584        manager.register_dex_for_subscriptions(
585            DexType::UniswapV3,
586            "Swap(address,address,int256,int256,uint160,uint128,int24)",
587            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
588            "Burn(address,int24,int24,uint128,uint256,uint256)",
589            "Collect(address,address,int24,int24,uint128,uint128)",
590            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
591        );
592
593        // Should store them unchanged (normalized to lowercase)
594        let swap_sig = manager
595            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
596            .unwrap();
597        let mint_sig = manager
598            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
599            .unwrap();
600        let burn_sig = manager
601            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
602            .unwrap();
603
604        assert_eq!(
605            swap_sig,
606            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
607        );
608        assert_eq!(
609            mint_sig,
610            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
611        );
612        assert_eq!(
613            burn_sig,
614            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
615        );
616    }
617
618    #[rstest]
619    fn test_register_with_pre_encoded_signatures_no_prefix() {
620        let mut manager = DefiDataSubscriptionManager::new();
621
622        // Register with pre-encoded hashes without 0x prefix
623        manager.register_dex_for_subscriptions(
624            DexType::UniswapV3,
625            "Swap(address,address,int256,int256,uint160,uint128,int24)",
626            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
627            "Burn(address,int24,int24,uint128,uint256,uint256)",
628            "Collect(address,address,int24,int24,uint128,uint128)",
629            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
630        );
631
632        // Should add 0x prefix and normalize to lowercase
633        let swap_sig = manager
634            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
635            .unwrap();
636        let mint_sig = manager
637            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
638            .unwrap();
639        let burn_sig = manager
640            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
641            .unwrap();
642
643        assert_eq!(
644            swap_sig,
645            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
646        );
647        assert_eq!(
648            mint_sig,
649            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
650        );
651        assert_eq!(
652            burn_sig,
653            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
654        );
655    }
656}