nautilus_blockchain/data/
subscription.rs1use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_model::defi::DexType;
19
20#[derive(Debug)]
25pub struct DefiDataSubscriptionManager {
26 subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
27 pool_swap_event_encoded: AHashMap<DexType, String>,
28 subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
29 pool_mint_event_encoded: AHashMap<DexType, String>,
30 subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
31 pool_burn_event_encoded: AHashMap<DexType, String>,
32}
33
34impl Default for DefiDataSubscriptionManager {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl DefiDataSubscriptionManager {
41 #[must_use]
43 pub fn new() -> Self {
44 Self {
45 subscribed_pool_burns: AHashMap::new(),
46 subscribed_pool_mints: AHashMap::new(),
47 subscribed_pool_swaps: AHashMap::new(),
48 pool_swap_event_encoded: AHashMap::new(),
49 pool_burn_event_encoded: AHashMap::new(),
50 pool_mint_event_encoded: AHashMap::new(),
51 }
52 }
53
54 #[must_use]
56 pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
57 let mut unique_addresses = AHashSet::new();
58
59 if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
60 unique_addresses.extend(addresses.iter().copied());
61 }
62 if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
63 unique_addresses.extend(addresses.iter().copied());
64 }
65 if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
66 unique_addresses.extend(addresses.iter().copied());
67 }
68
69 unique_addresses.into_iter().collect()
70 }
71
72 #[must_use]
74 pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
75 let mut result = Vec::new();
76
77 if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
78 result.push(swap_event_signature.clone());
79 }
80 if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
81 result.push(mint_event_signature.clone());
82 }
83 if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
84 result.push(burn_event_signature.clone());
85 }
86
87 result
88 }
89
90 #[must_use]
92 pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
93 self.pool_swap_event_encoded.get(dex).cloned()
94 }
95
96 #[must_use]
98 pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
99 self.pool_mint_event_encoded.get(dex).cloned()
100 }
101 #[must_use]
103 pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
104 self.pool_burn_event_encoded.get(dex).cloned()
105 }
106
107 fn normalize_topic(sig: &str) -> String {
116 let s = sig.trim();
117
118 if let Some(rest) = s.strip_prefix("0x") {
120 if rest.len() == 64 && rest.chars().all(|c| c.is_ascii_hexdigit()) {
121 return format!("0x{}", rest.to_ascii_lowercase());
122 }
123 }
124
125 if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
127 return format!("0x{}", s.to_ascii_lowercase());
128 }
129
130 format!("0x{}", hex::encode(keccak256(s.as_bytes())))
132 }
133
134 pub fn register_dex_for_subscriptions(
139 &mut self,
140 dex: DexType,
141 swap_event_signature: &str,
142 mint_event_signature: &str,
143 burn_event_signature: &str,
144 ) {
145 self.subscribed_pool_swaps.insert(dex, AHashSet::new());
146 self.pool_swap_event_encoded
147 .insert(dex, Self::normalize_topic(swap_event_signature));
148
149 self.subscribed_pool_mints.insert(dex, AHashSet::new());
150 self.pool_mint_event_encoded
151 .insert(dex, Self::normalize_topic(mint_event_signature));
152
153 self.subscribed_pool_burns.insert(dex, AHashSet::new());
154 self.pool_burn_event_encoded
155 .insert(dex, Self::normalize_topic(burn_event_signature));
156
157 tracing::info!("Registered DEX for subscriptions: {dex:?}");
158 }
159
160 pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
162 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
163 pool_set.insert(address);
164 } else {
165 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
166 }
167 }
168
169 pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
171 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
172 pool_set.insert(address);
173 } else {
174 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
175 }
176 }
177
178 pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
180 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
181 pool_set.insert(address);
182 } else {
183 tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
184 }
185 }
186
187 pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
189 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
190 pool_set.remove(&address);
191 } else {
192 tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
193 }
194 }
195
196 pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
198 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
199 pool_set.remove(&address);
200 } else {
201 tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
202 }
203 }
204
205 pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
207 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
208 pool_set.remove(&address);
209 } else {
210 tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
211 }
212 }
213}
214
215#[cfg(test)]
220mod tests {
221 use alloy::primitives::address;
222 use nautilus_model::defi::DexType;
223 use rstest::{fixture, rstest};
224
225 use super::*;
226
227 #[fixture]
228 fn manager() -> DefiDataSubscriptionManager {
229 DefiDataSubscriptionManager::new()
230 }
231
232 #[fixture]
233 fn registered_manager() -> DefiDataSubscriptionManager {
234 let mut manager = DefiDataSubscriptionManager::new();
235 manager.register_dex_for_subscriptions(
236 DexType::UniswapV3,
237 "Swap(address,address,int256,int256,uint160,uint128,int24)",
238 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
239 "Burn(address,int24,int24,uint128,uint256,uint256)",
240 );
241 manager
242 }
243
244 #[rstest]
245 fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
246 assert_eq!(
247 manager
248 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
249 .len(),
250 0
251 );
252 assert_eq!(
253 manager
254 .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
255 .len(),
256 0
257 );
258 assert!(
259 manager
260 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
261 .is_none()
262 );
263 assert!(
264 manager
265 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
266 .is_none()
267 );
268 assert!(
269 manager
270 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
271 .is_none()
272 );
273 }
274
275 #[rstest]
276 fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
277 let signatures =
279 registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
280 assert_eq!(signatures.len(), 3);
281
282 assert!(
284 registered_manager
285 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
286 .is_some()
287 );
288 assert!(
289 registered_manager
290 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
291 .is_some()
292 );
293 assert!(
294 registered_manager
295 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
296 .is_some()
297 );
298 }
299
300 #[rstest]
301 fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
302 let pool_address = address!("1234567890123456789012345678901234567890");
303
304 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
306
307 let addresses =
308 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
309 assert_eq!(addresses.len(), 1);
310 assert_eq!(addresses[0], pool_address);
311 }
312
313 #[rstest]
314 fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
315 let pool_address = address!("1234567890123456789012345678901234567890");
316
317 manager.subscribe_swaps(DexType::UniswapV3, pool_address);
319 manager.subscribe_mints(DexType::UniswapV3, pool_address);
320 manager.subscribe_burns(DexType::UniswapV3, pool_address);
321
322 let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
324 assert_eq!(addresses.len(), 0);
325 }
326
327 #[rstest]
328 fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
329 let pool_address = address!("1234567890123456789012345678901234567890");
330
331 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
333
334 assert_eq!(
336 registered_manager
337 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
338 .len(),
339 1
340 );
341
342 registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
344
345 assert_eq!(
347 registered_manager
348 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
349 .len(),
350 0
351 );
352 }
353
354 #[rstest]
355 fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
356 let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
357 let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
358 let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
359
360 assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
362 assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
363 assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
364 }
365
366 #[rstest]
367 fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
368 let pool_address = address!("1234567890123456789012345678901234567890");
369
370 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
372 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
373
374 let addresses =
376 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
377 assert_eq!(addresses.len(), 1);
378 }
379
380 #[rstest]
381 fn test_get_combined_addresses_from_all_events(
382 mut registered_manager: DefiDataSubscriptionManager,
383 ) {
384 let pool1 = address!("1111111111111111111111111111111111111111");
385 let pool2 = address!("2222222222222222222222222222222222222222");
386 let pool3 = address!("3333333333333333333333333333333333333333");
387
388 registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
390 registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
391 registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
392
393 let addresses =
395 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
396 assert_eq!(addresses.len(), 3);
397 assert!(addresses.contains(&pool1));
398 assert!(addresses.contains(&pool2));
399 assert!(addresses.contains(&pool3));
400 }
401
402 #[rstest]
403 fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
404 let swap_sig = registered_manager
407 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
408 .unwrap();
409
410 assert!(swap_sig.starts_with("0x"));
412 assert_eq!(swap_sig.len(), 66); let hex_part = &swap_sig[2..];
416 assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
417 }
418
419 #[rstest]
420 #[case(DexType::UniswapV3)]
421 #[case(DexType::UniswapV2)]
422 fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
423 let mut manager = DefiDataSubscriptionManager::new();
424 let pool1 = address!("1111111111111111111111111111111111111111");
425 let pool2 = address!("2222222222222222222222222222222222222222");
426
427 manager.register_dex_for_subscriptions(
429 dex_type,
430 "Swap(address,uint256,uint256)",
431 "Mint(address,uint256)",
432 "Burn(address,uint256)",
433 );
434
435 manager.subscribe_swaps(dex_type, pool1);
437 manager.subscribe_swaps(dex_type, pool2);
438 manager.subscribe_mints(dex_type, pool1);
439 manager.subscribe_burns(dex_type, pool2);
440
441 let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
443 assert_eq!(addresses.len(), 2);
444 assert!(addresses.contains(&pool1));
445 assert!(addresses.contains(&pool2));
446
447 let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
449 assert_eq!(signatures.len(), 3);
450
451 manager.unsubscribe_swaps(dex_type, pool1);
453 manager.unsubscribe_burns(dex_type, pool2);
454
455 let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
457 assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
460
461 #[rstest]
462 fn test_register_with_raw_signatures() {
463 let mut manager = DefiDataSubscriptionManager::new();
464
465 manager.register_dex_for_subscriptions(
467 DexType::UniswapV3,
468 "Swap(address,address,int256,int256,uint160,uint128,int24)",
469 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
470 "Burn(address,int24,int24,uint128,uint256,uint256)",
471 );
472
473 let swap_sig = manager
475 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
476 .unwrap();
477 let mint_sig = manager
478 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
479 .unwrap();
480 let burn_sig = manager
481 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
482 .unwrap();
483
484 assert_eq!(
486 swap_sig,
487 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
488 );
489 assert_eq!(
490 mint_sig,
491 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
492 );
493 assert_eq!(
494 burn_sig,
495 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
496 );
497 }
498
499 #[rstest]
500 fn test_register_with_pre_encoded_signatures() {
501 let mut manager = DefiDataSubscriptionManager::new();
502
503 manager.register_dex_for_subscriptions(
505 DexType::UniswapV3,
506 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
507 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde",
508 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c",
509 );
510
511 let swap_sig = manager
513 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
514 .unwrap();
515 let mint_sig = manager
516 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
517 .unwrap();
518 let burn_sig = manager
519 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
520 .unwrap();
521
522 assert_eq!(
523 swap_sig,
524 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
525 );
526 assert_eq!(
527 mint_sig,
528 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
529 );
530 assert_eq!(
531 burn_sig,
532 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
533 );
534 }
535
536 #[rstest]
537 fn test_register_with_pre_encoded_signatures_no_prefix() {
538 let mut manager = DefiDataSubscriptionManager::new();
539
540 manager.register_dex_for_subscriptions(
542 DexType::UniswapV3,
543 "c42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
544 "7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde",
545 "0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c",
546 );
547
548 let swap_sig = manager
550 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
551 .unwrap();
552 let mint_sig = manager
553 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
554 .unwrap();
555 let burn_sig = manager
556 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
557 .unwrap();
558
559 assert_eq!(
560 swap_sig,
561 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
562 );
563 assert_eq!(
564 mint_sig,
565 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
566 );
567 assert_eq!(
568 burn_sig,
569 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
570 );
571 }
572}