1use std::str::FromStr;
17
18use alloy::primitives::{Address, I256, U160, U256};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 defi::{
22 PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap,
23 data::{DexPoolData, PoolFeeCollect, PoolFlash},
24 validation::validate_address,
25 },
26 identifiers::InstrumentId,
27};
28use sqlx::{FromRow, Row, postgres::PgRow};
29
30#[derive(Debug)]
35pub struct TokenRow {
36 pub address: Address,
37 pub name: String,
38 pub symbol: String,
39 pub decimals: i32,
40}
41
42impl<'r> FromRow<'r, PgRow> for TokenRow {
43 fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
44 let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
45 let name = row.try_get::<String, _>("name")?;
46 let symbol = row.try_get::<String, _>("symbol")?;
47 let decimals = row.try_get::<i32, _>("decimals")?;
48
49 let token = Self {
50 address,
51 name,
52 symbol,
53 decimals,
54 };
55 Ok(token)
56 }
57}
58
59#[derive(Debug)]
60pub struct PoolRow {
61 pub address: Address,
62 pub dex_name: String,
63 pub creation_block: i64,
64 pub token0_chain: i32,
65 pub token0_address: Address,
66 pub token1_chain: i32,
67 pub token1_address: Address,
68 pub fee: Option<i32>,
69 pub tick_spacing: Option<i32>,
70 pub initial_tick: Option<i32>,
71 pub initial_sqrt_price_x96: Option<String>,
72}
73
74impl<'r> FromRow<'r, PgRow> for PoolRow {
75 fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
76 let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
77 let dex_name = row.try_get::<String, _>("dex_name")?;
78 let creation_block = row.try_get::<i64, _>("creation_block")?;
79 let token0_chain = row.try_get::<i32, _>("token0_chain")?;
80 let token0_address =
81 validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
82 let token1_chain = row.try_get::<i32, _>("token1_chain")?;
83 let token1_address =
84 validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
85 let fee = row.try_get::<Option<i32>, _>("fee")?;
86 let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
87 let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
88 let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
89
90 Ok(Self {
91 address,
92 dex_name,
93 creation_block,
94 token0_chain,
95 token0_address,
96 token1_chain,
97 token1_address,
98 fee,
99 tick_spacing,
100 initial_tick,
101 initial_sqrt_price_x96,
102 })
103 }
104}
105
106#[derive(Debug)]
108pub struct BlockTimestampRow {
109 pub number: u64,
111 pub timestamp: UnixNanos,
113}
114
115impl FromRow<'_, PgRow> for BlockTimestampRow {
116 fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
117 let number = row.try_get::<i64, _>("number")? as u64;
118 let timestamp = row.try_get::<String, _>("timestamp")?;
119 Ok(Self {
120 number,
121 timestamp: UnixNanos::from(timestamp),
122 })
123 }
124}
125
126pub fn transform_row_to_dex_pool_data(
131 row: &PgRow,
132 chain: nautilus_model::defi::SharedChain,
133 dex: nautilus_model::defi::SharedDex,
134 instrument_id: InstrumentId,
135) -> Result<DexPoolData, sqlx::Error> {
136 let event_type = row.try_get::<String, _>("event_type")?;
137 let pool_address_str = row.try_get::<String, _>("pool_address")?;
138 let block = row.try_get::<i64, _>("block")? as u64;
139 let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
140 let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
141 let log_index = row.try_get::<i32, _>("log_index")? as u32;
142
143 let pool_address = validate_address(&pool_address_str)
144 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
145
146 match event_type.as_str() {
147 "swap" => {
148 let sender_str = row
149 .try_get::<Option<String>, _>("sender")?
150 .ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
151 let sender = validate_address(&sender_str)
152 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
153
154 let recipient_str = row
155 .try_get::<Option<String>, _>("recipient")?
156 .ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
157 let recipient = validate_address(&recipient_str)
158 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
159
160 let sqrt_price_x96_str = row
161 .try_get::<Option<String>, _>("sqrt_price_x96")?
162 .ok_or_else(|| {
163 sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
164 })?;
165 let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
166 sqlx::Error::Decode(
167 format!("Invalid sqrt_price_x96 '{}': {}", sqrt_price_x96_str, e).into(),
168 )
169 })?;
170
171 let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
172 let swap_liquidity = u128::from_str(&swap_liquidity_str)
173 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
174
175 let swap_tick = row.try_get::<i32, _>("swap_tick")?;
176
177 let swap_amount0_str = row
178 .try_get::<Option<String>, _>("swap_amount0")?
179 .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
180 let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
181 sqlx::Error::Decode(
182 format!("Invalid swap_amount0 '{}': {}", swap_amount0_str, e).into(),
183 )
184 })?;
185
186 let swap_amount1_str = row
187 .try_get::<Option<String>, _>("swap_amount1")?
188 .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
189 let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
190 sqlx::Error::Decode(
191 format!("Invalid swap_amount1 '{}': {}", swap_amount1_str, e).into(),
192 )
193 })?;
194
195 let pool_swap = PoolSwap::new(
196 chain,
197 dex,
198 instrument_id,
199 pool_address,
200 block,
201 transaction_hash,
202 transaction_index,
203 log_index,
204 None, sender,
206 recipient,
207 amount0,
208 amount1,
209 sqrt_price_x96,
210 swap_liquidity,
211 swap_tick,
212 None,
213 None,
214 None,
215 );
216
217 Ok(DexPoolData::Swap(pool_swap))
218 }
219 "liquidity" => {
220 let kind_str = row
221 .try_get::<Option<String>, _>("liquidity_event_type")?
222 .ok_or_else(|| {
223 sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
224 })?;
225
226 let kind = match kind_str.as_str() {
227 "Mint" => PoolLiquidityUpdateType::Mint,
228 "Burn" => PoolLiquidityUpdateType::Burn,
229 _ => {
230 return Err(sqlx::Error::Decode(
231 format!("Unknown liquidity update type: {}", kind_str).into(),
232 ));
233 }
234 };
235
236 let sender = row
237 .try_get::<Option<String>, _>("sender")?
238 .map(|s| validate_address(&s))
239 .transpose()
240 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
241
242 let owner_str = row
243 .try_get::<Option<String>, _>("owner")?
244 .ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
245 let owner = validate_address(&owner_str)
246 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
247
248 let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
250 let position_liquidity =
251 u128::from_str_radix(&position_liquidity_str, 10).map_err(|e| {
252 sqlx::Error::Decode(
253 format!(
254 "Invalid position_liquidity '{}': {}",
255 position_liquidity_str, e
256 )
257 .into(),
258 )
259 })?;
260
261 let amount0_str = row.try_get::<String, _>("amount0")?;
262 let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
263 sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
264 })?;
265
266 let amount1_str = row.try_get::<String, _>("amount1")?;
267 let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
268 sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
269 })?;
270
271 let tick_lower = row
272 .try_get::<Option<i32>, _>("tick_lower")?
273 .ok_or_else(|| {
274 sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
275 })?;
276
277 let tick_upper = row
278 .try_get::<Option<i32>, _>("tick_upper")?
279 .ok_or_else(|| {
280 sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
281 })?;
282
283 let pool_liquidity_update = PoolLiquidityUpdate::new(
284 chain,
285 dex,
286 instrument_id,
287 pool_address,
288 kind,
289 block,
290 transaction_hash,
291 transaction_index,
292 log_index,
293 sender,
294 owner,
295 position_liquidity,
296 amount0,
297 amount1,
298 tick_lower,
299 tick_upper,
300 None, );
302
303 Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
304 }
305 "collect" => {
306 let owner_str = row
307 .try_get::<Option<String>, _>("owner")?
308 .ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
309 let owner = validate_address(&owner_str)
310 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
311
312 let amount0_str = row.try_get::<String, _>("amount0")?;
314 let amount0 = u128::from_str_radix(&amount0_str, 10).map_err(|e| {
315 sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
316 })?;
317
318 let amount1_str = row.try_get::<String, _>("amount1")?;
319 let amount1 = u128::from_str_radix(&amount1_str, 10).map_err(|e| {
320 sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
321 })?;
322
323 let tick_lower = row
324 .try_get::<Option<i32>, _>("tick_lower")?
325 .ok_or_else(|| {
326 sqlx::Error::Decode("Missing tick_lower for collect event".into())
327 })?;
328
329 let tick_upper = row
330 .try_get::<Option<i32>, _>("tick_upper")?
331 .ok_or_else(|| {
332 sqlx::Error::Decode("Missing tick_upper for collect event".into())
333 })?;
334
335 let pool_fee_collect = PoolFeeCollect::new(
336 chain,
337 dex,
338 instrument_id,
339 pool_address,
340 block,
341 transaction_hash,
342 transaction_index,
343 log_index,
344 owner,
345 amount0,
346 amount1,
347 tick_lower,
348 tick_upper,
349 None, );
351
352 Ok(DexPoolData::FeeCollect(pool_fee_collect))
353 }
354 "flash" => {
355 let sender_str = row
356 .try_get::<Option<String>, _>("sender")?
357 .ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
358 let sender = validate_address(&sender_str)
359 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
360
361 let recipient_str = row
362 .try_get::<Option<String>, _>("recipient")?
363 .ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
364 let recipient = validate_address(&recipient_str)
365 .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
366
367 let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
369 let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
370 sqlx::Error::Decode(
371 format!("Invalid flash_amount0 '{}': {}", flash_amount0_str, e).into(),
372 )
373 })?;
374
375 let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
376 let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
377 sqlx::Error::Decode(
378 format!("Invalid flash_amount1 '{}': {}", flash_amount1_str, e).into(),
379 )
380 })?;
381
382 let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
383 let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
384 sqlx::Error::Decode(
385 format!("Invalid flash_paid0 '{}': {}", flash_paid0_str, e).into(),
386 )
387 })?;
388
389 let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
390 let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
391 sqlx::Error::Decode(
392 format!("Invalid flash_paid1 '{}': {}", flash_paid1_str, e).into(),
393 )
394 })?;
395
396 let pool_flash = PoolFlash::new(
397 chain,
398 dex,
399 instrument_id,
400 pool_address,
401 block,
402 transaction_hash,
403 transaction_index,
404 log_index,
405 None, sender,
407 recipient,
408 amount0,
409 amount1,
410 paid0,
411 paid1,
412 );
413
414 Ok(DexPoolData::Flash(pool_flash))
415 }
416 _ => Err(sqlx::Error::Decode(
417 format!("Unknown event type: {}", event_type).into(),
418 )),
419 }
420}