nautilus_blockchain/cache/
rows.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use alloy::primitives::{Address, I256, U160, U256};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    defi::{
22        PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap, SharedChain, SharedDex,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash},
24        validation::validate_address,
25    },
26    identifiers::InstrumentId,
27};
28use sqlx::{FromRow, Row, postgres::PgRow};
29
30/// A data transfer object that maps database rows to token data.
31///
32/// Implements `FromRow` trait to automatically convert PostgreSQL results into `TokenRow`
33/// objects that can be transformed into domain entity `Token` objects.
34#[derive(Debug)]
35pub struct TokenRow {
36    pub address: Address,
37    pub name: String,
38    pub symbol: String,
39    pub decimals: i32,
40}
41
42impl<'r> FromRow<'r, PgRow> for TokenRow {
43    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
44        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
45        let name = row.try_get::<String, _>("name")?;
46        let symbol = row.try_get::<String, _>("symbol")?;
47        let decimals = row.try_get::<i32, _>("decimals")?;
48
49        let token = Self {
50            address,
51            name,
52            symbol,
53            decimals,
54        };
55        Ok(token)
56    }
57}
58
59#[derive(Debug)]
60pub struct PoolRow {
61    pub address: Address,
62    pub pool_identifier: String,
63    pub dex_name: String,
64    pub creation_block: i64,
65    pub token0_chain: i32,
66    pub token0_address: Address,
67    pub token1_chain: i32,
68    pub token1_address: Address,
69    pub fee: Option<i32>,
70    pub tick_spacing: Option<i32>,
71    pub initial_tick: Option<i32>,
72    pub initial_sqrt_price_x96: Option<String>,
73    pub hook_address: Option<String>,
74}
75
76impl<'r> FromRow<'r, PgRow> for PoolRow {
77    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
78        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
79        let pool_identifier = row.try_get::<String, _>("pool_identifier")?;
80        let dex_name = row.try_get::<String, _>("dex_name")?;
81        let creation_block = row.try_get::<i64, _>("creation_block")?;
82        let token0_chain = row.try_get::<i32, _>("token0_chain")?;
83        let token0_address =
84            validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
85        let token1_chain = row.try_get::<i32, _>("token1_chain")?;
86        let token1_address =
87            validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
88        let fee = row.try_get::<Option<i32>, _>("fee")?;
89        let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
90        let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
91        let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
92        let hook_address = row.try_get::<Option<String>, _>("hook_address")?;
93
94        Ok(Self {
95            address,
96            pool_identifier,
97            dex_name,
98            creation_block,
99            token0_chain,
100            token0_address,
101            token1_chain,
102            token1_address,
103            fee,
104            tick_spacing,
105            initial_tick,
106            initial_sqrt_price_x96,
107            hook_address,
108        })
109    }
110}
111
112/// A data transfer object that maps database rows to block timestamp data.
113#[derive(Debug)]
114pub struct BlockTimestampRow {
115    /// The block number.
116    pub number: u64,
117    /// The block timestamp.
118    pub timestamp: UnixNanos,
119}
120
121impl FromRow<'_, PgRow> for BlockTimestampRow {
122    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
123        let number = row.try_get::<i64, _>("number")? as u64;
124        let timestamp = row.try_get::<String, _>("timestamp")?;
125        Ok(Self {
126            number,
127            timestamp: UnixNanos::from(timestamp),
128        })
129    }
130}
131
132/// Transforms a database row from the pool events UNION query into a DexPoolData enum variant.
133///
134/// This function directly processes a PostgreSQL row and creates the appropriate DexPoolData
135/// variant based on the event_type discriminator field, using the provided context.
136///
137/// # Errors
138///
139/// Returns an error if row field extraction fails or data validation fails.
140pub fn transform_row_to_dex_pool_data(
141    row: &PgRow,
142    chain: SharedChain,
143    dex: SharedDex,
144    instrument_id: InstrumentId,
145) -> Result<DexPoolData, sqlx::Error> {
146    let event_type = row.try_get::<String, _>("event_type")?;
147    let pool_identifier_str = row.try_get::<String, _>("pool_identifier")?;
148    let pool_identifier = pool_identifier_str
149        .parse()
150        .map_err(|e| sqlx::Error::Decode(format!("Invalid pool identifier: {e}").into()))?;
151    let block = row.try_get::<i64, _>("block")? as u64;
152    let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
153    let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
154    let log_index = row.try_get::<i32, _>("log_index")? as u32;
155
156    match event_type.as_str() {
157        "swap" => {
158            let sender_str = row
159                .try_get::<Option<String>, _>("sender")?
160                .ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
161            let sender = validate_address(&sender_str)
162                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
163
164            let recipient_str = row
165                .try_get::<Option<String>, _>("recipient")?
166                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
167            let recipient = validate_address(&recipient_str)
168                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
169
170            let sqrt_price_x96_str = row
171                .try_get::<Option<String>, _>("sqrt_price_x96")?
172                .ok_or_else(|| {
173                    sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
174                })?;
175            let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
176                sqlx::Error::Decode(
177                    format!("Invalid sqrt_price_x96 '{sqrt_price_x96_str}': {e}").into(),
178                )
179            })?;
180
181            let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
182            let swap_liquidity = u128::from_str(&swap_liquidity_str)
183                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
184
185            let swap_tick = row.try_get::<i32, _>("swap_tick")?;
186
187            let swap_amount0_str = row
188                .try_get::<Option<String>, _>("swap_amount0")?
189                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
190            let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
191                sqlx::Error::Decode(
192                    format!("Invalid swap_amount0 '{swap_amount0_str}': {e}").into(),
193                )
194            })?;
195
196            let swap_amount1_str = row
197                .try_get::<Option<String>, _>("swap_amount1")?
198                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
199            let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
200                sqlx::Error::Decode(
201                    format!("Invalid swap_amount1 '{swap_amount1_str}': {e}").into(),
202                )
203            })?;
204
205            let pool_swap = PoolSwap::new(
206                chain,
207                dex,
208                instrument_id,
209                pool_identifier,
210                block,
211                transaction_hash,
212                transaction_index,
213                log_index,
214                None, // timestamp
215                sender,
216                recipient,
217                amount0,
218                amount1,
219                sqrt_price_x96,
220                swap_liquidity,
221                swap_tick,
222            );
223
224            Ok(DexPoolData::Swap(pool_swap))
225        }
226        "liquidity" => {
227            let kind_str = row
228                .try_get::<Option<String>, _>("liquidity_event_type")?
229                .ok_or_else(|| {
230                    sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
231                })?;
232
233            let kind = match kind_str.as_str() {
234                "Mint" => PoolLiquidityUpdateType::Mint,
235                "Burn" => PoolLiquidityUpdateType::Burn,
236                _ => {
237                    return Err(sqlx::Error::Decode(
238                        format!("Unknown liquidity update type: {kind_str}").into(),
239                    ));
240                }
241            };
242
243            let sender = row
244                .try_get::<Option<String>, _>("sender")?
245                .map(|s| validate_address(&s))
246                .transpose()
247                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
248
249            let owner_str = row
250                .try_get::<Option<String>, _>("owner")?
251                .ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
252            let owner = validate_address(&owner_str)
253                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
254
255            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
256            let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
257            let position_liquidity = position_liquidity_str.parse::<u128>().map_err(|e| {
258                sqlx::Error::Decode(
259                    format!("Invalid position_liquidity '{position_liquidity_str}': {e}").into(),
260                )
261            })?;
262
263            let amount0_str = row.try_get::<String, _>("amount0")?;
264            let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
265                sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
266            })?;
267
268            let amount1_str = row.try_get::<String, _>("amount1")?;
269            let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
270                sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
271            })?;
272
273            let tick_lower = row
274                .try_get::<Option<i32>, _>("tick_lower")?
275                .ok_or_else(|| {
276                    sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
277                })?;
278
279            let tick_upper = row
280                .try_get::<Option<i32>, _>("tick_upper")?
281                .ok_or_else(|| {
282                    sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
283                })?;
284
285            let pool_liquidity_update = PoolLiquidityUpdate::new(
286                chain,
287                dex,
288                instrument_id,
289                pool_identifier,
290                kind,
291                block,
292                transaction_hash,
293                transaction_index,
294                log_index,
295                sender,
296                owner,
297                position_liquidity,
298                amount0,
299                amount1,
300                tick_lower,
301                tick_upper,
302                None, // timestamp
303            );
304
305            Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
306        }
307        "collect" => {
308            let owner_str = row
309                .try_get::<Option<String>, _>("owner")?
310                .ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
311            let owner = validate_address(&owner_str)
312                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
313
314            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
315            let amount0_str = row.try_get::<String, _>("amount0")?;
316            let amount0 = amount0_str.parse::<u128>().map_err(|e| {
317                sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
318            })?;
319
320            let amount1_str = row.try_get::<String, _>("amount1")?;
321            let amount1 = amount1_str.parse::<u128>().map_err(|e| {
322                sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
323            })?;
324
325            let tick_lower = row
326                .try_get::<Option<i32>, _>("tick_lower")?
327                .ok_or_else(|| {
328                    sqlx::Error::Decode("Missing tick_lower for collect event".into())
329                })?;
330
331            let tick_upper = row
332                .try_get::<Option<i32>, _>("tick_upper")?
333                .ok_or_else(|| {
334                    sqlx::Error::Decode("Missing tick_upper for collect event".into())
335                })?;
336
337            let pool_fee_collect = PoolFeeCollect::new(
338                chain,
339                dex,
340                instrument_id,
341                pool_identifier,
342                block,
343                transaction_hash,
344                transaction_index,
345                log_index,
346                owner,
347                amount0,
348                amount1,
349                tick_lower,
350                tick_upper,
351                None, // timestamp
352            );
353
354            Ok(DexPoolData::FeeCollect(pool_fee_collect))
355        }
356        "flash" => {
357            let sender_str = row
358                .try_get::<Option<String>, _>("sender")?
359                .ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
360            let sender = validate_address(&sender_str)
361                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
362
363            let recipient_str = row
364                .try_get::<Option<String>, _>("recipient")?
365                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
366            let recipient = validate_address(&recipient_str)
367                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
368
369            // For flash events, we have flash_amount0, flash_amount1, flash_paid0, flash_paid1
370            let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
371            let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
372                sqlx::Error::Decode(
373                    format!("Invalid flash_amount0 '{flash_amount0_str}': {e}").into(),
374                )
375            })?;
376
377            let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
378            let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
379                sqlx::Error::Decode(
380                    format!("Invalid flash_amount1 '{flash_amount1_str}': {e}").into(),
381                )
382            })?;
383
384            let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
385            let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
386                sqlx::Error::Decode(format!("Invalid flash_paid0 '{flash_paid0_str}': {e}").into())
387            })?;
388
389            let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
390            let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
391                sqlx::Error::Decode(format!("Invalid flash_paid1 '{flash_paid1_str}': {e}").into())
392            })?;
393
394            let pool_flash = PoolFlash::new(
395                chain,
396                dex,
397                instrument_id,
398                pool_identifier,
399                block,
400                transaction_hash,
401                transaction_index,
402                log_index,
403                None, // timestamp
404                sender,
405                recipient,
406                amount0,
407                amount1,
408                paid0,
409                paid1,
410            );
411
412            Ok(DexPoolData::Flash(pool_flash))
413        }
414        _ => Err(sqlx::Error::Decode(
415            format!("Unknown event type: {event_type}").into(),
416        )),
417    }
418}