nautilus_blockchain/data/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20/// Manages subscriptions to DeFi protocol events (swaps, mints, burns) across different DEXs.
21///
22/// This manager tracks which pool addresses are subscribed for each event type
23/// and maintains the event signature encodings for efficient filtering.
24#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26    subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
27    pool_swap_event_encoded: AHashMap<DexType, String>,
28    subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
29    pool_mint_event_encoded: AHashMap<DexType, String>,
30    subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
31    pool_burn_event_encoded: AHashMap<DexType, String>,
32}
33
34impl Default for DefiDataSubscriptionManager {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl DefiDataSubscriptionManager {
41    /// Creates a new [`DefiDataSubscriptionManager`] instance.
42    #[must_use]
43    pub fn new() -> Self {
44        Self {
45            subscribed_pool_burns: AHashMap::new(),
46            subscribed_pool_mints: AHashMap::new(),
47            subscribed_pool_swaps: AHashMap::new(),
48            pool_swap_event_encoded: AHashMap::new(),
49            pool_burn_event_encoded: AHashMap::new(),
50            pool_mint_event_encoded: AHashMap::new(),
51        }
52    }
53
54    /// Gets all unique contract addresses subscribed for any event type for a given DEX.
55    #[must_use]
56    pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
57        let mut unique_addresses = AHashSet::new();
58
59        if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
60            unique_addresses.extend(addresses.iter().copied());
61        }
62        if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
63            unique_addresses.extend(addresses.iter().copied());
64        }
65        if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
66            unique_addresses.extend(addresses.iter().copied());
67        }
68
69        unique_addresses.into_iter().collect()
70    }
71
72    /// Gets all event signatures (keccak256 hashes) registered for a given DEX.
73    #[must_use]
74    pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
75        let mut result = Vec::new();
76
77        if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
78            result.push(swap_event_signature.clone());
79        }
80        if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
81            result.push(mint_event_signature.clone());
82        }
83        if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
84            result.push(burn_event_signature.clone());
85        }
86
87        result
88    }
89
90    /// Gets the swap event signature for a specific DEX.
91    #[must_use]
92    pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
93        self.pool_swap_event_encoded.get(dex).cloned()
94    }
95
96    /// Gets the mint event signature for a specific DEX.
97    #[must_use]
98    pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
99        self.pool_mint_event_encoded.get(dex).cloned()
100    }
101    /// Gets the burn event signature for a specific DEX.
102    #[must_use]
103    pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
104        self.pool_burn_event_encoded.get(dex).cloned()
105    }
106
107    /// Normalizes an event signature to a consistent format.
108    ///
109    /// Accepts:
110    /// - A raw event signature like "Swap(address,address,int256,int256,uint160,uint128,int24)".
111    /// - A pre-encoded topic like "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67".
112    /// - A hex string without 0x prefix.
113    ///
114    /// Returns a normalized "0x..." format string.
115    fn normalize_topic(sig: &str) -> String {
116        let s = sig.trim();
117
118        // Check if it's already a properly formatted hex string with 0x prefix
119        if let Some(rest) = s.strip_prefix("0x") {
120            if rest.len() == 64 && rest.chars().all(|c| c.is_ascii_hexdigit()) {
121                return format!("0x{}", rest.to_ascii_lowercase());
122            }
123        }
124
125        // Check if it's a hex string without 0x prefix
126        if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
127            return format!("0x{}", s.to_ascii_lowercase());
128        }
129
130        // Otherwise, it's a raw signature that needs hashing
131        format!("0x{}", hex::encode(keccak256(s.as_bytes())))
132    }
133
134    /// Registers a DEX with its event signatures for subscription management.
135    ///
136    /// This must be called before subscribing to any events for a DEX.
137    /// Event signatures can be either raw signatures or pre-encoded keccak256 hashes.
138    pub fn register_dex_for_subscriptions(
139        &mut self,
140        dex: DexType,
141        swap_event_signature: &str,
142        mint_event_signature: &str,
143        burn_event_signature: &str,
144    ) {
145        self.subscribed_pool_swaps.insert(dex, AHashSet::new());
146        self.pool_swap_event_encoded
147            .insert(dex, Self::normalize_topic(swap_event_signature));
148
149        self.subscribed_pool_mints.insert(dex, AHashSet::new());
150        self.pool_mint_event_encoded
151            .insert(dex, Self::normalize_topic(mint_event_signature));
152
153        self.subscribed_pool_burns.insert(dex, AHashSet::new());
154        self.pool_burn_event_encoded
155            .insert(dex, Self::normalize_topic(burn_event_signature));
156
157        tracing::info!("Registered DEX for subscriptions: {dex:?}");
158    }
159
160    /// Subscribes to swap events for a specific pool address on a DEX.
161    pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
162        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
163            pool_set.insert(address);
164        } else {
165            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
166        }
167    }
168
169    /// Subscribes to mint events for a specific pool address on a DEX.
170    pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
171        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
172            pool_set.insert(address);
173        } else {
174            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
175        }
176    }
177
178    /// Subscribes to burn events for a specific pool address on a DEX.
179    pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
180        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
181            pool_set.insert(address);
182        } else {
183            tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
184        }
185    }
186
187    /// Unsubscribes from swap events for a specific pool address on a DEX.
188    pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
189        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
190            pool_set.remove(&address);
191        } else {
192            tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
193        }
194    }
195
196    /// Unsubscribes from mint events for a specific pool address on a DEX.
197    pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
198        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
199            pool_set.remove(&address);
200        } else {
201            tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
202        }
203    }
204
205    /// Unsubscribes from burn events for a specific pool address on a DEX.
206    pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
207        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
208            pool_set.remove(&address);
209        } else {
210            tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
211        }
212    }
213}
214
215////////////////////////////////////////////////////////////////////////////////
216// Tests
217////////////////////////////////////////////////////////////////////////////////
218
219#[cfg(test)]
220mod tests {
221    use alloy::primitives::address;
222    use nautilus_model::defi::DexType;
223    use rstest::{fixture, rstest};
224
225    use super::*;
226
227    #[fixture]
228    fn manager() -> DefiDataSubscriptionManager {
229        DefiDataSubscriptionManager::new()
230    }
231
232    #[fixture]
233    fn registered_manager() -> DefiDataSubscriptionManager {
234        let mut manager = DefiDataSubscriptionManager::new();
235        manager.register_dex_for_subscriptions(
236            DexType::UniswapV3,
237            "Swap(address,address,int256,int256,uint160,uint128,int24)",
238            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
239            "Burn(address,int24,int24,uint128,uint256,uint256)",
240        );
241        manager
242    }
243
244    #[rstest]
245    fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
246        assert_eq!(
247            manager
248                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
249                .len(),
250            0
251        );
252        assert_eq!(
253            manager
254                .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
255                .len(),
256            0
257        );
258        assert!(
259            manager
260                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
261                .is_none()
262        );
263        assert!(
264            manager
265                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
266                .is_none()
267        );
268        assert!(
269            manager
270                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
271                .is_none()
272        );
273    }
274
275    #[rstest]
276    fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
277        // Should have all three event signatures
278        let signatures =
279            registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
280        assert_eq!(signatures.len(), 3);
281
282        // Each signature should be properly encoded
283        assert!(
284            registered_manager
285                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
286                .is_some()
287        );
288        assert!(
289            registered_manager
290                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
291                .is_some()
292        );
293        assert!(
294            registered_manager
295                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
296                .is_some()
297        );
298    }
299
300    #[rstest]
301    fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
302        let pool_address = address!("1234567890123456789012345678901234567890");
303
304        // Subscribe to swap events
305        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
306
307        let addresses =
308            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
309        assert_eq!(addresses.len(), 1);
310        assert_eq!(addresses[0], pool_address);
311    }
312
313    #[rstest]
314    fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
315        let pool_address = address!("1234567890123456789012345678901234567890");
316
317        // Try to subscribe without registering - should log warning but not panic
318        manager.subscribe_swaps(DexType::UniswapV3, pool_address);
319        manager.subscribe_mints(DexType::UniswapV3, pool_address);
320        manager.subscribe_burns(DexType::UniswapV3, pool_address);
321
322        // Should return empty results
323        let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
324        assert_eq!(addresses.len(), 0);
325    }
326
327    #[rstest]
328    fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
329        let pool_address = address!("1234567890123456789012345678901234567890");
330
331        // Subscribe
332        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
333
334        // Verify subscription
335        assert_eq!(
336            registered_manager
337                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
338                .len(),
339            1
340        );
341
342        // Unsubscribe
343        registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
344
345        // Verify removal
346        assert_eq!(
347            registered_manager
348                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
349                .len(),
350            0
351        );
352    }
353
354    #[rstest]
355    fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
356        let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
357        let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
358        let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
359
360        // All should be Some and start with 0x
361        assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
362        assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
363        assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
364    }
365
366    #[rstest]
367    fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
368        let pool_address = address!("1234567890123456789012345678901234567890");
369
370        // Subscribe same address multiple times to same event type
371        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
372        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
373
374        // Should only appear once (HashSet behavior)
375        let addresses =
376            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
377        assert_eq!(addresses.len(), 1);
378    }
379
380    #[rstest]
381    fn test_get_combined_addresses_from_all_events(
382        mut registered_manager: DefiDataSubscriptionManager,
383    ) {
384        let pool1 = address!("1111111111111111111111111111111111111111");
385        let pool2 = address!("2222222222222222222222222222222222222222");
386        let pool3 = address!("3333333333333333333333333333333333333333");
387
388        // Subscribe different pools to different events
389        registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
390        registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
391        registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
392
393        // Should get all unique addresses
394        let addresses =
395            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
396        assert_eq!(addresses.len(), 3);
397        assert!(addresses.contains(&pool1));
398        assert!(addresses.contains(&pool2));
399        assert!(addresses.contains(&pool3));
400    }
401
402    #[rstest]
403    fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
404        // Known event signature and its expected keccak256 hash
405        // Swap(address,address,int256,int256,uint160,uint128,int24) for UniswapV3
406        let swap_sig = registered_manager
407            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
408            .unwrap();
409
410        // Should be properly formatted hex string
411        assert!(swap_sig.starts_with("0x"));
412        assert_eq!(swap_sig.len(), 66); // 0x + 64 hex chars (32 bytes)
413
414        // Verify it's valid hex
415        let hex_part = &swap_sig[2..];
416        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
417    }
418
419    #[rstest]
420    #[case(DexType::UniswapV3)]
421    #[case(DexType::UniswapV2)]
422    fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
423        let mut manager = DefiDataSubscriptionManager::new();
424        let pool1 = address!("1111111111111111111111111111111111111111");
425        let pool2 = address!("2222222222222222222222222222222222222222");
426
427        // Step 1: Register DEX
428        manager.register_dex_for_subscriptions(
429            dex_type,
430            "Swap(address,uint256,uint256)",
431            "Mint(address,uint256)",
432            "Burn(address,uint256)",
433        );
434
435        // Step 2: Subscribe to events
436        manager.subscribe_swaps(dex_type, pool1);
437        manager.subscribe_swaps(dex_type, pool2);
438        manager.subscribe_mints(dex_type, pool1);
439        manager.subscribe_burns(dex_type, pool2);
440
441        // Step 3: Verify subscriptions
442        let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
443        assert_eq!(addresses.len(), 2);
444        assert!(addresses.contains(&pool1));
445        assert!(addresses.contains(&pool2));
446
447        // Step 4: Get event signatures
448        let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
449        assert_eq!(signatures.len(), 3);
450
451        // Step 5: Unsubscribe from some events
452        manager.unsubscribe_swaps(dex_type, pool1);
453        manager.unsubscribe_burns(dex_type, pool2);
454
455        // Step 6: Verify remaining subscriptions (only pool1 mint remains)
456        let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
457        assert!(remaining.contains(&pool1)); // Still has mint subscription
458        assert!(remaining.contains(&pool2)); // Still has swap subscription
459    }
460
461    #[rstest]
462    fn test_register_with_raw_signatures() {
463        let mut manager = DefiDataSubscriptionManager::new();
464
465        // Register with raw event signatures
466        manager.register_dex_for_subscriptions(
467            DexType::UniswapV3,
468            "Swap(address,address,int256,int256,uint160,uint128,int24)",
469            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
470            "Burn(address,int24,int24,uint128,uint256,uint256)",
471        );
472
473        // Known keccak256 hashes for UniswapV3 events
474        let swap_sig = manager
475            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
476            .unwrap();
477        let mint_sig = manager
478            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
479            .unwrap();
480        let burn_sig = manager
481            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
482            .unwrap();
483
484        // Verify the exact hash values
485        assert_eq!(
486            swap_sig,
487            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
488        );
489        assert_eq!(
490            mint_sig,
491            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
492        );
493        assert_eq!(
494            burn_sig,
495            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
496        );
497    }
498
499    #[rstest]
500    fn test_register_with_pre_encoded_signatures() {
501        let mut manager = DefiDataSubscriptionManager::new();
502
503        // Register with pre-encoded keccak256 hashes (with 0x prefix)
504        manager.register_dex_for_subscriptions(
505            DexType::UniswapV3,
506            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
507            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde",
508            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c",
509        );
510
511        // Should store them unchanged (normalized to lowercase)
512        let swap_sig = manager
513            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
514            .unwrap();
515        let mint_sig = manager
516            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
517            .unwrap();
518        let burn_sig = manager
519            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
520            .unwrap();
521
522        assert_eq!(
523            swap_sig,
524            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
525        );
526        assert_eq!(
527            mint_sig,
528            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
529        );
530        assert_eq!(
531            burn_sig,
532            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
533        );
534    }
535
536    #[rstest]
537    fn test_register_with_pre_encoded_signatures_no_prefix() {
538        let mut manager = DefiDataSubscriptionManager::new();
539
540        // Register with pre-encoded hashes without 0x prefix
541        manager.register_dex_for_subscriptions(
542            DexType::UniswapV3,
543            "c42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
544            "7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde",
545            "0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c",
546        );
547
548        // Should add 0x prefix and normalize to lowercase
549        let swap_sig = manager
550            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
551            .unwrap();
552        let mint_sig = manager
553            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
554            .unwrap();
555        let burn_sig = manager
556            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
557            .unwrap();
558
559        assert_eq!(
560            swap_sig,
561            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
562        );
563        assert_eq!(
564            mint_sig,
565            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
566        );
567        assert_eq!(
568            burn_sig,
569            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
570        );
571    }
572}