nautilus_blockchain/exchanges/parsing/uniswap_v3/
collect.rs1use alloy::{dyn_abi::SolType, primitives::Address, sol};
17use nautilus_model::defi::{PoolIdentifier, SharedDex, rpc::RpcLog};
18use ustr::Ustr;
19
20use crate::{
21 events::collect::CollectEvent,
22 hypersync::{
23 HypersyncLog,
24 helpers::{
25 extract_address_from_topic, extract_block_number, extract_log_index,
26 extract_transaction_hash, extract_transaction_index, validate_event_signature_hash,
27 },
28 },
29 rpc::helpers as rpc_helpers,
30};
31
32const COLLECT_EVENT_SIGNATURE_HASH: &str =
33 "70935338e69775456a85ddef226c395fb668b63fa0115f5f20610b388e6ca9c0";
34
35sol! {
39 struct CollectEventData {
40 address recipient;
41 uint128 amount0;
42 uint128 amount1;
43 }
44}
45
46pub fn parse_collect_event_hypersync(
56 dex: SharedDex,
57 log: HypersyncLog,
58) -> anyhow::Result<CollectEvent> {
59 validate_event_signature_hash("Collect", COLLECT_EVENT_SIGNATURE_HASH, &log)?;
60
61 let owner = extract_address_from_topic(&log, 1, "owner")?;
62
63 let tick_lower = match log.topics.get(2).and_then(|t| t.as_ref()) {
65 Some(topic) => {
66 let tick_lower_bytes: [u8; 32] = topic.as_ref().try_into()?;
67 i32::from_be_bytes(tick_lower_bytes[28..32].try_into()?)
68 }
69 None => anyhow::bail!("Missing tickLower in topic2 when parsing collect event"),
70 };
71
72 let tick_upper = match log.topics.get(3).and_then(|t| t.as_ref()) {
74 Some(topic) => {
75 let tick_upper_bytes: [u8; 32] = topic.as_ref().try_into()?;
76 i32::from_be_bytes(tick_upper_bytes[28..32].try_into()?)
77 }
78 None => anyhow::bail!("Missing tickUpper in topic3 when parsing collect event"),
79 };
80
81 if let Some(data) = &log.data {
82 let data_bytes = data.as_ref();
83
84 if data_bytes.len() < 3 * 32 {
86 anyhow::bail!("Collect event data is too short");
87 }
88
89 let decoded = match <CollectEventData as SolType>::abi_decode(data_bytes) {
91 Ok(decoded) => decoded,
92 Err(e) => anyhow::bail!("Failed to decode collect event data: {e}"),
93 };
94
95 let pool_address = Address::from_slice(
96 log.address
97 .clone()
98 .expect("Contract address should be set in logs")
99 .as_ref(),
100 );
101 let pool_identifier = PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
102 Ok(CollectEvent::new(
103 dex,
104 pool_identifier,
105 extract_block_number(&log)?,
106 extract_transaction_hash(&log)?,
107 extract_transaction_index(&log)?,
108 extract_log_index(&log)?,
109 owner,
110 decoded.recipient,
111 tick_lower,
112 tick_upper,
113 decoded.amount0,
114 decoded.amount1,
115 ))
116 } else {
117 Err(anyhow::anyhow!("Missing data in collect event log"))
118 }
119}
120
121pub fn parse_collect_event_rpc(dex: SharedDex, log: &RpcLog) -> anyhow::Result<CollectEvent> {
127 rpc_helpers::validate_event_signature(log, COLLECT_EVENT_SIGNATURE_HASH, "Collect")?;
128
129 let owner = rpc_helpers::extract_address_from_topic(log, 1, "owner")?;
130
131 let tick_lower_bytes = rpc_helpers::extract_topic_bytes(log, 2)?;
133 let tick_lower = i32::from_be_bytes(tick_lower_bytes[28..32].try_into()?);
134
135 let tick_upper_bytes = rpc_helpers::extract_topic_bytes(log, 3)?;
137 let tick_upper = i32::from_be_bytes(tick_upper_bytes[28..32].try_into()?);
138
139 let data_bytes = rpc_helpers::extract_data_bytes(log)?;
140
141 if data_bytes.len() < 3 * 32 {
143 anyhow::bail!("Collect event data is too short");
144 }
145
146 let decoded = match <CollectEventData as SolType>::abi_decode(&data_bytes) {
148 Ok(decoded) => decoded,
149 Err(e) => anyhow::bail!("Failed to decode collect event data: {e}"),
150 };
151
152 let pool_address = rpc_helpers::extract_address(log)?;
153 let pool_identifier = PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
154 Ok(CollectEvent::new(
155 dex,
156 pool_identifier,
157 rpc_helpers::extract_block_number(log)?,
158 rpc_helpers::extract_transaction_hash(log)?,
159 rpc_helpers::extract_transaction_index(log)?,
160 rpc_helpers::extract_log_index(log)?,
161 owner,
162 decoded.recipient,
163 tick_lower,
164 tick_upper,
165 decoded.amount0,
166 decoded.amount1,
167 ))
168}
169
170#[cfg(test)]
171mod tests {
172 use rstest::*;
173 use serde_json::json;
174
175 use super::*;
176 use crate::exchanges::arbitrum;
177
178 #[fixture]
185 fn hypersync_log() -> HypersyncLog {
186 let log_json = json!({
187 "removed": null,
188 "log_index": "0x11",
189 "transaction_index": "0x5",
190 "transaction_hash": "0x0c70f6d6bcf8508ba620b9d1250c95ad67108e35707c5d7456349ea207051bae",
191 "block_hash": null,
192 "block_number": "0x175a6484",
193 "address": "0xd13040d4fe917EE704158CfCB3338dCd2838B245",
194 "data": "0x000000000000000000000000a61da382c18d9d5beb905ea192bae25e4c15d5120000000000000000000000000000000000000000000000bf28af828dd3ae56340000000000000000000000000000000000000000000000000665eae21b1cffc8",
195 "topics": [
196 "0x70935338e69775456a85ddef226c395fb668b63fa0115f5f20610b388e6ca9c0",
197 "0x000000000000000000000000c36442b4a4522e871399cd717abdd847ab11fe88",
198 "0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffdde09",
199 "0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffdde0c"
200 ]
201 });
202 serde_json::from_value(log_json).expect("Failed to deserialize HyperSync log")
203 }
204
205 #[fixture]
207 fn rpc_log() -> RpcLog {
208 let log_json = json!({
209 "removed": false,
210 "logIndex": "0x11",
211 "transactionIndex": "0x5",
212 "transactionHash": "0x0c70f6d6bcf8508ba620b9d1250c95ad67108e35707c5d7456349ea207051bae",
213 "blockHash": "0xe925eaa1f5178ceedfa24043a974edb928ddab7195600b6b99ff5403fbf13c8b",
214 "blockNumber": "0x175a6484",
215 "address": "0xd13040d4fe917EE704158CfCB3338dCd2838B245",
216 "data": "0x000000000000000000000000a61da382c18d9d5beb905ea192bae25e4c15d5120000000000000000000000000000000000000000000000bf28af828dd3ae56340000000000000000000000000000000000000000000000000665eae21b1cffc8",
217 "topics": [
218 "0x70935338e69775456a85ddef226c395fb668b63fa0115f5f20610b388e6ca9c0",
219 "0x000000000000000000000000c36442b4a4522e871399cd717abdd847ab11fe88",
220 "0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffdde09",
221 "0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffdde0c"
222 ]
223 });
224 serde_json::from_value(log_json).expect("Failed to deserialize RPC log")
225 }
226
227 #[rstest]
228 fn test_parse_collect_event_hypersync(hypersync_log: HypersyncLog) {
229 let dex = arbitrum::UNISWAP_V3.dex.clone();
230 let event = parse_collect_event_hypersync(dex, hypersync_log).unwrap();
231
232 assert_eq!(
233 event.pool_identifier.to_string(),
234 "0xd13040d4fe917EE704158CfCB3338dCd2838B245"
235 );
236 assert_eq!(
237 event.owner.to_string().to_lowercase(),
238 "0xc36442b4a4522e871399cd717abdd847ab11fe88"
239 );
240 assert_eq!(
241 event.recipient.to_string().to_lowercase(),
242 "0xa61da382c18d9d5beb905ea192bae25e4c15d512"
243 );
244 assert_eq!(event.tick_lower, -139767);
245 assert_eq!(event.tick_upper, -139764);
246 let expected_amount0 = u128::from_str_radix("bf28af828dd3ae5634", 16).unwrap();
247 assert_eq!(event.amount0, expected_amount0);
248 let expected_amount1 = u128::from_str_radix("665eae21b1cffc8", 16).unwrap();
249 assert_eq!(event.amount1, expected_amount1);
250 assert_eq!(event.block_number, 391799940);
251 }
252
253 #[rstest]
254 fn test_parse_collect_event_rpc(rpc_log: RpcLog) {
255 let dex = arbitrum::UNISWAP_V3.dex.clone();
256 let event = parse_collect_event_rpc(dex, &rpc_log).unwrap();
257
258 assert_eq!(
259 event.pool_identifier.to_string(),
260 "0xd13040d4fe917EE704158CfCB3338dCd2838B245"
261 );
262 assert_eq!(
263 event.owner.to_string().to_lowercase(),
264 "0xc36442b4a4522e871399cd717abdd847ab11fe88"
265 );
266 assert_eq!(
267 event.recipient.to_string().to_lowercase(),
268 "0xa61da382c18d9d5beb905ea192bae25e4c15d512"
269 );
270 assert_eq!(event.tick_lower, -139767);
271 assert_eq!(event.tick_upper, -139764);
272 let expected_amount0 = u128::from_str_radix("bf28af828dd3ae5634", 16).unwrap();
273 assert_eq!(event.amount0, expected_amount0);
274 let expected_amount1 = u128::from_str_radix("665eae21b1cffc8", 16).unwrap();
275 assert_eq!(event.amount1, expected_amount1);
276 assert_eq!(event.block_number, 391799940);
277 }
278
279 #[rstest]
280 fn test_hypersync_rpc_match(hypersync_log: HypersyncLog, rpc_log: RpcLog) {
281 let dex = arbitrum::UNISWAP_V3.dex.clone();
282 let event_hypersync = parse_collect_event_hypersync(dex.clone(), hypersync_log).unwrap();
283 let event_rpc = parse_collect_event_rpc(dex, &rpc_log).unwrap();
284
285 assert_eq!(event_hypersync.pool_identifier, event_rpc.pool_identifier);
286 assert_eq!(event_hypersync.owner, event_rpc.owner);
287 assert_eq!(event_hypersync.recipient, event_rpc.recipient);
288 assert_eq!(event_hypersync.tick_lower, event_rpc.tick_lower);
289 assert_eq!(event_hypersync.tick_upper, event_rpc.tick_upper);
290 assert_eq!(event_hypersync.amount0, event_rpc.amount0);
291 assert_eq!(event_hypersync.amount1, event_rpc.amount1);
292 assert_eq!(event_hypersync.block_number, event_rpc.block_number);
293 assert_eq!(event_hypersync.transaction_hash, event_rpc.transaction_hash);
294 }
295}