nautilus_blockchain/cache/
rows.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use alloy::primitives::{Address, I256, U160, U256};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    defi::{
22        PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap,
23        data::{DexPoolData, PoolFeeCollect, PoolFlash},
24        validation::validate_address,
25    },
26    identifiers::InstrumentId,
27};
28use sqlx::{FromRow, Row, postgres::PgRow};
29
30/// A data transfer object that maps database rows to token data.
31///
32/// Implements `FromRow` trait to automatically convert PostgreSQL results into `TokenRow`
33/// objects that can be transformed into domain entity `Token` objects.
34#[derive(Debug)]
35pub struct TokenRow {
36    pub address: Address,
37    pub name: String,
38    pub symbol: String,
39    pub decimals: i32,
40}
41
42impl<'r> FromRow<'r, PgRow> for TokenRow {
43    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
44        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
45        let name = row.try_get::<String, _>("name")?;
46        let symbol = row.try_get::<String, _>("symbol")?;
47        let decimals = row.try_get::<i32, _>("decimals")?;
48
49        let token = Self {
50            address,
51            name,
52            symbol,
53            decimals,
54        };
55        Ok(token)
56    }
57}
58
59#[derive(Debug)]
60pub struct PoolRow {
61    pub address: Address,
62    pub dex_name: String,
63    pub creation_block: i64,
64    pub token0_chain: i32,
65    pub token0_address: Address,
66    pub token1_chain: i32,
67    pub token1_address: Address,
68    pub fee: Option<i32>,
69    pub tick_spacing: Option<i32>,
70    pub initial_tick: Option<i32>,
71    pub initial_sqrt_price_x96: Option<String>,
72}
73
74impl<'r> FromRow<'r, PgRow> for PoolRow {
75    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
76        let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
77        let dex_name = row.try_get::<String, _>("dex_name")?;
78        let creation_block = row.try_get::<i64, _>("creation_block")?;
79        let token0_chain = row.try_get::<i32, _>("token0_chain")?;
80        let token0_address =
81            validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
82        let token1_chain = row.try_get::<i32, _>("token1_chain")?;
83        let token1_address =
84            validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
85        let fee = row.try_get::<Option<i32>, _>("fee")?;
86        let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
87        let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
88        let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
89
90        Ok(Self {
91            address,
92            dex_name,
93            creation_block,
94            token0_chain,
95            token0_address,
96            token1_chain,
97            token1_address,
98            fee,
99            tick_spacing,
100            initial_tick,
101            initial_sqrt_price_x96,
102        })
103    }
104}
105
106/// A data transfer object that maps database rows to block timestamp data.
107#[derive(Debug)]
108pub struct BlockTimestampRow {
109    /// The block number.
110    pub number: u64,
111    /// The block timestamp.
112    pub timestamp: UnixNanos,
113}
114
115impl FromRow<'_, PgRow> for BlockTimestampRow {
116    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
117        let number = row.try_get::<i64, _>("number")? as u64;
118        let timestamp = row.try_get::<String, _>("timestamp")?;
119        Ok(Self {
120            number,
121            timestamp: UnixNanos::from(timestamp),
122        })
123    }
124}
125
126/// Transforms a database row from the pool events UNION query into a DexPoolData enum variant.
127///
128/// This function directly processes a PostgreSQL row and creates the appropriate DexPoolData
129/// variant based on the event_type discriminator field, using the provided context.
130///
131/// # Errors
132///
133/// Returns an error if row field extraction fails or data validation fails.
134pub fn transform_row_to_dex_pool_data(
135    row: &PgRow,
136    chain: nautilus_model::defi::SharedChain,
137    dex: nautilus_model::defi::SharedDex,
138    instrument_id: InstrumentId,
139) -> Result<DexPoolData, sqlx::Error> {
140    let event_type = row.try_get::<String, _>("event_type")?;
141    let pool_address_str = row.try_get::<String, _>("pool_address")?;
142    let block = row.try_get::<i64, _>("block")? as u64;
143    let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
144    let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
145    let log_index = row.try_get::<i32, _>("log_index")? as u32;
146
147    let pool_address = validate_address(&pool_address_str)
148        .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
149
150    match event_type.as_str() {
151        "swap" => {
152            let sender_str = row
153                .try_get::<Option<String>, _>("sender")?
154                .ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
155            let sender = validate_address(&sender_str)
156                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
157
158            let recipient_str = row
159                .try_get::<Option<String>, _>("recipient")?
160                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
161            let recipient = validate_address(&recipient_str)
162                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
163
164            let sqrt_price_x96_str = row
165                .try_get::<Option<String>, _>("sqrt_price_x96")?
166                .ok_or_else(|| {
167                    sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
168                })?;
169            let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
170                sqlx::Error::Decode(
171                    format!("Invalid sqrt_price_x96 '{}': {}", sqrt_price_x96_str, e).into(),
172                )
173            })?;
174
175            let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
176            let swap_liquidity = u128::from_str(&swap_liquidity_str)
177                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
178
179            let swap_tick = row.try_get::<i32, _>("swap_tick")?;
180
181            let swap_amount0_str = row
182                .try_get::<Option<String>, _>("swap_amount0")?
183                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
184            let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
185                sqlx::Error::Decode(
186                    format!("Invalid swap_amount0 '{}': {}", swap_amount0_str, e).into(),
187                )
188            })?;
189
190            let swap_amount1_str = row
191                .try_get::<Option<String>, _>("swap_amount1")?
192                .ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
193            let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
194                sqlx::Error::Decode(
195                    format!("Invalid swap_amount1 '{}': {}", swap_amount1_str, e).into(),
196                )
197            })?;
198
199            let pool_swap = PoolSwap::new(
200                chain,
201                dex,
202                instrument_id,
203                pool_address,
204                block,
205                transaction_hash,
206                transaction_index,
207                log_index,
208                None, // timestamp
209                sender,
210                recipient,
211                amount0,
212                amount1,
213                sqrt_price_x96,
214                swap_liquidity,
215                swap_tick,
216            );
217
218            Ok(DexPoolData::Swap(pool_swap))
219        }
220        "liquidity" => {
221            let kind_str = row
222                .try_get::<Option<String>, _>("liquidity_event_type")?
223                .ok_or_else(|| {
224                    sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
225                })?;
226
227            let kind = match kind_str.as_str() {
228                "Mint" => PoolLiquidityUpdateType::Mint,
229                "Burn" => PoolLiquidityUpdateType::Burn,
230                _ => {
231                    return Err(sqlx::Error::Decode(
232                        format!("Unknown liquidity update type: {}", kind_str).into(),
233                    ));
234                }
235            };
236
237            let sender = row
238                .try_get::<Option<String>, _>("sender")?
239                .map(|s| validate_address(&s))
240                .transpose()
241                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
242
243            let owner_str = row
244                .try_get::<Option<String>, _>("owner")?
245                .ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
246            let owner = validate_address(&owner_str)
247                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
248
249            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
250            let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
251            let position_liquidity = position_liquidity_str.parse::<u128>().map_err(|e| {
252                sqlx::Error::Decode(
253                    format!(
254                        "Invalid position_liquidity '{}': {}",
255                        position_liquidity_str, e
256                    )
257                    .into(),
258                )
259            })?;
260
261            let amount0_str = row.try_get::<String, _>("amount0")?;
262            let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
263                sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
264            })?;
265
266            let amount1_str = row.try_get::<String, _>("amount1")?;
267            let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
268                sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
269            })?;
270
271            let tick_lower = row
272                .try_get::<Option<i32>, _>("tick_lower")?
273                .ok_or_else(|| {
274                    sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
275                })?;
276
277            let tick_upper = row
278                .try_get::<Option<i32>, _>("tick_upper")?
279                .ok_or_else(|| {
280                    sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
281                })?;
282
283            let pool_liquidity_update = PoolLiquidityUpdate::new(
284                chain,
285                dex,
286                instrument_id,
287                pool_address,
288                kind,
289                block,
290                transaction_hash,
291                transaction_index,
292                log_index,
293                sender,
294                owner,
295                position_liquidity,
296                amount0,
297                amount1,
298                tick_lower,
299                tick_upper,
300                None, // timestamp
301            );
302
303            Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
304        }
305        "collect" => {
306            let owner_str = row
307                .try_get::<Option<String>, _>("owner")?
308                .ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
309            let owner = validate_address(&owner_str)
310                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
311
312            // UNION queries return NUMERIC type, not domain types, so we need to read as strings
313            let amount0_str = row.try_get::<String, _>("amount0")?;
314            let amount0 = amount0_str.parse::<u128>().map_err(|e| {
315                sqlx::Error::Decode(format!("Invalid amount0 '{}': {}", amount0_str, e).into())
316            })?;
317
318            let amount1_str = row.try_get::<String, _>("amount1")?;
319            let amount1 = amount1_str.parse::<u128>().map_err(|e| {
320                sqlx::Error::Decode(format!("Invalid amount1 '{}': {}", amount1_str, e).into())
321            })?;
322
323            let tick_lower = row
324                .try_get::<Option<i32>, _>("tick_lower")?
325                .ok_or_else(|| {
326                    sqlx::Error::Decode("Missing tick_lower for collect event".into())
327                })?;
328
329            let tick_upper = row
330                .try_get::<Option<i32>, _>("tick_upper")?
331                .ok_or_else(|| {
332                    sqlx::Error::Decode("Missing tick_upper for collect event".into())
333                })?;
334
335            let pool_fee_collect = PoolFeeCollect::new(
336                chain,
337                dex,
338                instrument_id,
339                pool_address,
340                block,
341                transaction_hash,
342                transaction_index,
343                log_index,
344                owner,
345                amount0,
346                amount1,
347                tick_lower,
348                tick_upper,
349                None, // timestamp
350            );
351
352            Ok(DexPoolData::FeeCollect(pool_fee_collect))
353        }
354        "flash" => {
355            let sender_str = row
356                .try_get::<Option<String>, _>("sender")?
357                .ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
358            let sender = validate_address(&sender_str)
359                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
360
361            let recipient_str = row
362                .try_get::<Option<String>, _>("recipient")?
363                .ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
364            let recipient = validate_address(&recipient_str)
365                .map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
366
367            // For flash events, we have flash_amount0, flash_amount1, flash_paid0, flash_paid1
368            let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
369            let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
370                sqlx::Error::Decode(
371                    format!("Invalid flash_amount0 '{}': {}", flash_amount0_str, e).into(),
372                )
373            })?;
374
375            let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
376            let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
377                sqlx::Error::Decode(
378                    format!("Invalid flash_amount1 '{}': {}", flash_amount1_str, e).into(),
379                )
380            })?;
381
382            let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
383            let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
384                sqlx::Error::Decode(
385                    format!("Invalid flash_paid0 '{}': {}", flash_paid0_str, e).into(),
386                )
387            })?;
388
389            let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
390            let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
391                sqlx::Error::Decode(
392                    format!("Invalid flash_paid1 '{}': {}", flash_paid1_str, e).into(),
393                )
394            })?;
395
396            let pool_flash = PoolFlash::new(
397                chain,
398                dex,
399                instrument_id,
400                pool_address,
401                block,
402                transaction_hash,
403                transaction_index,
404                log_index,
405                None, // timestamp
406                sender,
407                recipient,
408                amount0,
409                amount1,
410                paid0,
411                paid1,
412            );
413
414            Ok(DexPoolData::Flash(pool_flash))
415        }
416        _ => Err(sqlx::Error::Decode(
417            format!("Unknown event type: {}", event_type).into(),
418        )),
419    }
420}