nautilus_blockchain/cache/
rows.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use alloy::primitives::{Address, I256, U160, U256};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    defi::{
22        PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash},
24        validation::validate_address,
25    },
26    identifiers::InstrumentId,
27};
28use sqlx::{FromRow, Row, postgres::PgRow};
29
30/// A data transfer object that maps database rows to token data.
31///
32/// Implements `FromRow` trait to automatically convert PostgreSQL results into `TokenRow`
33/// objects that can be transformed into domain entity `Token` objects.
34#[derive(Debug)]
35pub struct TokenRow {
36    pub address: Address,
37    pub name: String,
38    pub symbol: String,
39    pub decimals: i32,
40}
41
42impl<'r> FromRow<'r, PgRow> for TokenRow {
43    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
44        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
45        let name = row.try_get::<String, _>("name")?;
46        let symbol = row.try_get::<String, _>("symbol")?;
47        let decimals = row.try_get::<i32, _>("decimals")?;
48
49        let token = Self {
50            address,
51            name,
52            symbol,
53            decimals,
54        };
55        Ok(token)
56    }
57}
58
59#[derive(Debug)]
60pub struct PoolRow {
61    pub address: Address,
62    pub dex_name: String,
63    pub creation_block: i64,
64    pub token0_chain: i32,
65    pub token0_address: Address,
66    pub token1_chain: i32,
67    pub token1_address: Address,
68    pub fee: Option<i32>,
69    pub tick_spacing: Option<i32>,
70    pub initial_tick: Option<i32>,
71    pub initial_sqrt_price_x96: Option<String>,
72}
73
74impl<'r> FromRow<'r, PgRow> for PoolRow {
75    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
76        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
77        let dex_name = row.try_get::<String, _>("dex_name")?;
78        let creation_block = row.try_get::<i64, _>("creation_block")?;
79        let token0_chain = row.try_get::<i32, _>("token0_chain")?;
80        let token0_address =
81            validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
82        let token1_chain = row.try_get::<i32, _>("token1_chain")?;
83        let token1_address =
84            validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
85        let fee = row.try_get::<Option<i32>, _>("fee")?;
86        let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
87        let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
88        let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
89
90        Ok(Self {
91            address,
92            dex_name,
93            creation_block,
94            token0_chain,
95            token0_address,
96            token1_chain,
97            token1_address,
98            fee,
99            tick_spacing,
100            initial_tick,
101            initial_sqrt_price_x96,
102        })
103    }
104}
105
106/// A data transfer object that maps database rows to block timestamp data.
107#[derive(Debug)]
108pub struct BlockTimestampRow {
109    /// The block number.
110    pub number: u64,
111    /// The block timestamp.
112    pub timestamp: UnixNanos,
113}
114
115impl FromRow<'_, PgRow> for BlockTimestampRow {
116    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
117        let number = row.try_get::<i64, _>("number")? as u64;
118        let timestamp = row.try_get::<String, _>("timestamp")?;
119        Ok(Self {
120            number,
121            timestamp: UnixNanos::from(timestamp),
122        })
123    }
124}
125
126/// Transforms a database row from the pool events UNION query into a DexPoolData enum variant.
127///
128/// This function directly processes a PostgreSQL row and creates the appropriate DexPoolData
129/// variant based on the event_type discriminator field, using the provided context.
130pub fn transform_row_to_dex_pool_data(
131    row: &PgRow,
132    chain: nautilus_model::defi::SharedChain,
133    dex: nautilus_model::defi::SharedDex,
134    instrument_id: InstrumentId,
135) -> Result<DexPoolData, sqlx::Error> {
136    let event_type = row.try_get::<String, _>("event_type")?;
137    let pool_address_str = row.try_get::<String, _>("pool_address")?;
138    let block = row.try_get::<i64, _>("block")? as u64;
139    let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
140    let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
141    let log_index = row.try_get::<i32, _>("log_index")? as u32;
142
143    let pool_address = validate_address(&pool_address_str)
144        .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
145
146    match event_type.as_str() {
147        "swap" => {
148            let sender_str = row
149                .try_get::<Option<String>, _>("sender")?
150                .ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
151            let sender = validate_address(&sender_str)
152                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
153
154            let recipient_str = row
155                .try_get::<Option<String>, _>("recipient")?
156                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
157            let recipient = validate_address(&recipient_str)
158                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
159
160            let sqrt_price_x96_str = row
161                .try_get::<Option<String>, _>("sqrt_price_x96")?
162                .ok_or_else(|| {
163                    sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
164                })?;
165            let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
166                sqlx::Error::Decode(
167                    format!("Invalid sqrt_price_x96 '{}': {}", sqrt_price_x96_str, e).into(),
168                )
169            })?;
170
171            let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
172            let swap_liquidity = u128::from_str(&swap_liquidity_str)
173                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
174
175            let swap_tick = row.try_get::<i32, _>("swap_tick")?;
176
177            let swap_amount0_str = row
178                .try_get::<Option<String>, _>("swap_amount0")?
179                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
180            let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
181                sqlx::Error::Decode(
182                    format!("Invalid swap_amount0 '{}': {}", swap_amount0_str, e).into(),
183                )
184            })?;
185
186            let swap_amount1_str = row
187                .try_get::<Option<String>, _>("swap_amount1")?
188                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
189            let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
190                sqlx::Error::Decode(
191                    format!("Invalid swap_amount1 '{}': {}", swap_amount1_str, e).into(),
192                )
193            })?;
194
195            let pool_swap = PoolSwap::new(
196                chain,
197                dex,
198                instrument_id,
199                pool_address,
200                block,
201                transaction_hash,
202                transaction_index,
203                log_index,
204                None, // timestamp
205                sender,
206                recipient,
207                amount0,
208                amount1,
209                sqrt_price_x96,
210                swap_liquidity,
211                swap_tick,
212                None,
213                None,
214                None,
215            );
216
217            Ok(DexPoolData::Swap(pool_swap))
218        }
219        "liquidity" => {
220            let kind_str = row
221                .try_get::<Option<String>, _>("liquidity_event_type")?
222                .ok_or_else(|| {
223                    sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
224                })?;
225
226            let kind = match kind_str.as_str() {
227                "Mint" => PoolLiquidityUpdateType::Mint,
228                "Burn" => PoolLiquidityUpdateType::Burn,
229                _ => {
230                    return Err(sqlx::Error::Decode(
231                        format!("Unknown liquidity update type: {}", kind_str).into(),
232                    ));
233                }
234            };
235
236            let sender = row
237                .try_get::<Option<String>, _>("sender")?
238                .map(|s| validate_address(&s))
239                .transpose()
240                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
241
242            let owner_str = row
243                .try_get::<Option<String>, _>("owner")?
244                .ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
245            let owner = validate_address(&owner_str)
246                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
247
248            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
249            let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
250            let position_liquidity =
251                u128::from_str_radix(&position_liquidity_str, 10).map_err(|e| {
252                    sqlx::Error::Decode(
253                        format!(
254                            "Invalid position_liquidity '{}': {}",
255                            position_liquidity_str, e
256                        )
257                        .into(),
258                    )
259                })?;
260
261            let amount0_str = row.try_get::<String, _>("amount0")?;
262            let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
263                sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
264            })?;
265
266            let amount1_str = row.try_get::<String, _>("amount1")?;
267            let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
268                sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
269            })?;
270
271            let tick_lower = row
272                .try_get::<Option<i32>, _>("tick_lower")?
273                .ok_or_else(|| {
274                    sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
275                })?;
276
277            let tick_upper = row
278                .try_get::<Option<i32>, _>("tick_upper")?
279                .ok_or_else(|| {
280                    sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
281                })?;
282
283            let pool_liquidity_update = PoolLiquidityUpdate::new(
284                chain,
285                dex,
286                instrument_id,
287                pool_address,
288                kind,
289                block,
290                transaction_hash,
291                transaction_index,
292                log_index,
293                sender,
294                owner,
295                position_liquidity,
296                amount0,
297                amount1,
298                tick_lower,
299                tick_upper,
300                None, // timestamp
301            );
302
303            Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
304        }
305        "collect" => {
306            let owner_str = row
307                .try_get::<Option<String>, _>("owner")?
308                .ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
309            let owner = validate_address(&owner_str)
310                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
311
312            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
313            let amount0_str = row.try_get::<String, _>("amount0")?;
314            let amount0 = u128::from_str_radix(&amount0_str, 10).map_err(|e| {
315                sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
316            })?;
317
318            let amount1_str = row.try_get::<String, _>("amount1")?;
319            let amount1 = u128::from_str_radix(&amount1_str, 10).map_err(|e| {
320                sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
321            })?;
322
323            let tick_lower = row
324                .try_get::<Option<i32>, _>("tick_lower")?
325                .ok_or_else(|| {
326                    sqlx::Error::Decode("Missing tick_lower for collect event".into())
327                })?;
328
329            let tick_upper = row
330                .try_get::<Option<i32>, _>("tick_upper")?
331                .ok_or_else(|| {
332                    sqlx::Error::Decode("Missing tick_upper for collect event".into())
333                })?;
334
335            let pool_fee_collect = PoolFeeCollect::new(
336                chain,
337                dex,
338                instrument_id,
339                pool_address,
340                block,
341                transaction_hash,
342                transaction_index,
343                log_index,
344                owner,
345                amount0,
346                amount1,
347                tick_lower,
348                tick_upper,
349                None, // timestamp
350            );
351
352            Ok(DexPoolData::FeeCollect(pool_fee_collect))
353        }
354        "flash" => {
355            let sender_str = row
356                .try_get::<Option<String>, _>("sender")?
357                .ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
358            let sender = validate_address(&sender_str)
359                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
360
361            let recipient_str = row
362                .try_get::<Option<String>, _>("recipient")?
363                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
364            let recipient = validate_address(&recipient_str)
365                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
366
367            // For flash events, we have flash_amount0, flash_amount1, flash_paid0, flash_paid1
368            let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
369            let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
370                sqlx::Error::Decode(
371                    format!("Invalid flash_amount0 '{}': {}", flash_amount0_str, e).into(),
372                )
373            })?;
374
375            let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
376            let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
377                sqlx::Error::Decode(
378                    format!("Invalid flash_amount1 '{}': {}", flash_amount1_str, e).into(),
379                )
380            })?;
381
382            let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
383            let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
384                sqlx::Error::Decode(
385                    format!("Invalid flash_paid0 '{}': {}", flash_paid0_str, e).into(),
386                )
387            })?;
388
389            let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
390            let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
391                sqlx::Error::Decode(
392                    format!("Invalid flash_paid1 '{}': {}", flash_paid1_str, e).into(),
393                )
394            })?;
395
396            let pool_flash = PoolFlash::new(
397                chain,
398                dex,
399                instrument_id,
400                pool_address,
401                block,
402                transaction_hash,
403                transaction_index,
404                log_index,
405                None, // timestamp
406                sender,
407                recipient,
408                amount0,
409                amount1,
410                paid0,
411                paid1,
412            );
413
414            Ok(DexPoolData::Flash(pool_flash))
415        }
416        _ => Err(sqlx::Error::Decode(
417            format!("Unknown event type: {}", event_type).into(),
418        )),
419    }
420}