1use std::str::FromStr;
17
18use alloy::primitives::{Address, I256, U160, U256};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 defi::{
22 PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap, SharedChain, SharedDex,
23 data::{DexPoolData, PoolFeeCollect, PoolFlash},
24 validation::validate_address,
25 },
26 identifiers::InstrumentId,
27};
28use sqlx::{FromRow, Row, postgres::PgRow};
29
30#[derive(Debug)]
35pub struct TokenRow {
36 pub address: Address,
37 pub name: String,
38 pub symbol: String,
39 pub decimals: i32,
40}
41
42impl<'r> FromRow<'r, PgRow> for TokenRow {
43 fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
44 let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
45 let name = row.try_get::<String, _>("name")?;
46 let symbol = row.try_get::<String, _>("symbol")?;
47 let decimals = row.try_get::<i32, _>("decimals")?;
48
49 let token = Self {
50 address,
51 name,
52 symbol,
53 decimals,
54 };
55 Ok(token)
56 }
57}
58
59#[derive(Debug)]
60pub struct PoolRow {
61 pub address: Address,
62 pub pool_identifier: String,
63 pub dex_name: String,
64 pub creation_block: i64,
65 pub token0_chain: i32,
66 pub token0_address: Address,
67 pub token1_chain: i32,
68 pub token1_address: Address,
69 pub fee: Option<i32>,
70 pub tick_spacing: Option<i32>,
71 pub initial_tick: Option<i32>,
72 pub initial_sqrt_price_x96: Option<String>,
73 pub hook_address: Option<String>,
74}
75
76impl<'r> FromRow<'r, PgRow> for PoolRow {
77 fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
78 let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
79 let pool_identifier = row.try_get::<String, _>("pool_identifier")?;
80 let dex_name = row.try_get::<String, _>("dex_name")?;
81 let creation_block = row.try_get::<i64, _>("creation_block")?;
82 let token0_chain = row.try_get::<i32, _>("token0_chain")?;
83 let token0_address =
84 validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
85 let token1_chain = row.try_get::<i32, _>("token1_chain")?;
86 let token1_address =
87 validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
88 let fee = row.try_get::<Option<i32>, _>("fee")?;
89 let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
90 let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
91 let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
92 let hook_address = row.try_get::<Option<String>, _>("hook_address")?;
93
94 Ok(Self {
95 address,
96 pool_identifier,
97 dex_name,
98 creation_block,
99 token0_chain,
100 token0_address,
101 token1_chain,
102 token1_address,
103 fee,
104 tick_spacing,
105 initial_tick,
106 initial_sqrt_price_x96,
107 hook_address,
108 })
109 }
110}
111
112#[derive(Debug)]
114pub struct BlockTimestampRow {
115 pub number: u64,
117 pub timestamp: UnixNanos,
119}
120
121impl FromRow<'_, PgRow> for BlockTimestampRow {
122 fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
123 let number = row.try_get::<i64, _>("number")? as u64;
124 let timestamp = row.try_get::<String, _>("timestamp")?;
125 Ok(Self {
126 number,
127 timestamp: UnixNanos::from(timestamp),
128 })
129 }
130}
131
132pub fn transform_row_to_dex_pool_data(
141 row: &PgRow,
142 chain: SharedChain,
143 dex: SharedDex,
144 instrument_id: InstrumentId,
145) -> Result<DexPoolData, sqlx::Error> {
146 let event_type = row.try_get::<String, _>("event_type")?;
147 let pool_identifier_str = row.try_get::<String, _>("pool_identifier")?;
148 let pool_identifier = pool_identifier_str
149 .parse()
150 .map_err(|e| sqlx::Error::Decode(format!("Invalid pool identifier: {e}").into()))?;
151 let block = row.try_get::<i64, _>("block")? as u64;
152 let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
153 let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
154 let log_index = row.try_get::<i32, _>("log_index")? as u32;
155
156 match event_type.as_str() {
157 "swap" => {
158 let sender_str = row
159 .try_get::<Option<String>, _>("sender")?
160 .ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
161 let sender = validate_address(&sender_str)
162 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
163
164 let recipient_str = row
165 .try_get::<Option<String>, _>("recipient")?
166 .ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
167 let recipient = validate_address(&recipient_str)
168 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
169
170 let sqrt_price_x96_str = row
171 .try_get::<Option<String>, _>("sqrt_price_x96")?
172 .ok_or_else(|| {
173 sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
174 })?;
175 let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
176 sqlx::Error::Decode(
177 format!("Invalid sqrt_price_x96 '{sqrt_price_x96_str}': {e}").into(),
178 )
179 })?;
180
181 let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
182 let swap_liquidity = u128::from_str(&swap_liquidity_str)
183 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
184
185 let swap_tick = row.try_get::<i32, _>("swap_tick")?;
186
187 let swap_amount0_str = row
188 .try_get::<Option<String>, _>("swap_amount0")?
189 .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
190 let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
191 sqlx::Error::Decode(
192 format!("Invalid swap_amount0 '{swap_amount0_str}': {e}").into(),
193 )
194 })?;
195
196 let swap_amount1_str = row
197 .try_get::<Option<String>, _>("swap_amount1")?
198 .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
199 let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
200 sqlx::Error::Decode(
201 format!("Invalid swap_amount1 '{swap_amount1_str}': {e}").into(),
202 )
203 })?;
204
205 let pool_swap = PoolSwap::new(
206 chain,
207 dex,
208 instrument_id,
209 pool_identifier,
210 block,
211 transaction_hash,
212 transaction_index,
213 log_index,
214 None, sender,
216 recipient,
217 amount0,
218 amount1,
219 sqrt_price_x96,
220 swap_liquidity,
221 swap_tick,
222 );
223
224 Ok(DexPoolData::Swap(pool_swap))
225 }
226 "liquidity" => {
227 let kind_str = row
228 .try_get::<Option<String>, _>("liquidity_event_type")?
229 .ok_or_else(|| {
230 sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
231 })?;
232
233 let kind = match kind_str.as_str() {
234 "Mint" => PoolLiquidityUpdateType::Mint,
235 "Burn" => PoolLiquidityUpdateType::Burn,
236 _ => {
237 return Err(sqlx::Error::Decode(
238 format!("Unknown liquidity update type: {kind_str}").into(),
239 ));
240 }
241 };
242
243 let sender = row
244 .try_get::<Option<String>, _>("sender")?
245 .map(|s| validate_address(&s))
246 .transpose()
247 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
248
249 let owner_str = row
250 .try_get::<Option<String>, _>("owner")?
251 .ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
252 let owner = validate_address(&owner_str)
253 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
254
255 let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
257 let position_liquidity = position_liquidity_str.parse::<u128>().map_err(|e| {
258 sqlx::Error::Decode(
259 format!("Invalid position_liquidity '{position_liquidity_str}': {e}").into(),
260 )
261 })?;
262
263 let amount0_str = row.try_get::<String, _>("amount0")?;
264 let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
265 sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
266 })?;
267
268 let amount1_str = row.try_get::<String, _>("amount1")?;
269 let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
270 sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
271 })?;
272
273 let tick_lower = row
274 .try_get::<Option<i32>, _>("tick_lower")?
275 .ok_or_else(|| {
276 sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
277 })?;
278
279 let tick_upper = row
280 .try_get::<Option<i32>, _>("tick_upper")?
281 .ok_or_else(|| {
282 sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
283 })?;
284
285 let pool_liquidity_update = PoolLiquidityUpdate::new(
286 chain,
287 dex,
288 instrument_id,
289 pool_identifier,
290 kind,
291 block,
292 transaction_hash,
293 transaction_index,
294 log_index,
295 sender,
296 owner,
297 position_liquidity,
298 amount0,
299 amount1,
300 tick_lower,
301 tick_upper,
302 None, );
304
305 Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
306 }
307 "collect" => {
308 let owner_str = row
309 .try_get::<Option<String>, _>("owner")?
310 .ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
311 let owner = validate_address(&owner_str)
312 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
313
314 let amount0_str = row.try_get::<String, _>("amount0")?;
316 let amount0 = amount0_str.parse::<u128>().map_err(|e| {
317 sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
318 })?;
319
320 let amount1_str = row.try_get::<String, _>("amount1")?;
321 let amount1 = amount1_str.parse::<u128>().map_err(|e| {
322 sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
323 })?;
324
325 let tick_lower = row
326 .try_get::<Option<i32>, _>("tick_lower")?
327 .ok_or_else(|| {
328 sqlx::Error::Decode("Missing tick_lower for collect event".into())
329 })?;
330
331 let tick_upper = row
332 .try_get::<Option<i32>, _>("tick_upper")?
333 .ok_or_else(|| {
334 sqlx::Error::Decode("Missing tick_upper for collect event".into())
335 })?;
336
337 let pool_fee_collect = PoolFeeCollect::new(
338 chain,
339 dex,
340 instrument_id,
341 pool_identifier,
342 block,
343 transaction_hash,
344 transaction_index,
345 log_index,
346 owner,
347 amount0,
348 amount1,
349 tick_lower,
350 tick_upper,
351 None, );
353
354 Ok(DexPoolData::FeeCollect(pool_fee_collect))
355 }
356 "flash" => {
357 let sender_str = row
358 .try_get::<Option<String>, _>("sender")?
359 .ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
360 let sender = validate_address(&sender_str)
361 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
362
363 let recipient_str = row
364 .try_get::<Option<String>, _>("recipient")?
365 .ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
366 let recipient = validate_address(&recipient_str)
367 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
368
369 let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
371 let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
372 sqlx::Error::Decode(
373 format!("Invalid flash_amount0 '{flash_amount0_str}': {e}").into(),
374 )
375 })?;
376
377 let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
378 let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
379 sqlx::Error::Decode(
380 format!("Invalid flash_amount1 '{flash_amount1_str}': {e}").into(),
381 )
382 })?;
383
384 let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
385 let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
386 sqlx::Error::Decode(format!("Invalid flash_paid0 '{flash_paid0_str}': {e}").into())
387 })?;
388
389 let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
390 let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
391 sqlx::Error::Decode(format!("Invalid flash_paid1 '{flash_paid1_str}': {e}").into())
392 })?;
393
394 let pool_flash = PoolFlash::new(
395 chain,
396 dex,
397 instrument_id,
398 pool_identifier,
399 block,
400 transaction_hash,
401 transaction_index,
402 log_index,
403 None, sender,
405 recipient,
406 amount0,
407 amount1,
408 paid0,
409 paid1,
410 );
411
412 Ok(DexPoolData::Flash(pool_flash))
413 }
414 _ => Err(sqlx::Error::Decode(
415 format!("Unknown event type: {event_type}").into(),
416 )),
417 }
418}