Skip to main content

nautilus_data/defi/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data engine functionality.
17//!
18//! This module provides DeFi processing methods for the `DataEngine`.
19//! All code in this module requires the `defi` feature flag.
20
21use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24    defi,
25    messages::defi::{
26        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27    },
28    msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32    defi::{
33        Blockchain, DefiData, PoolProfiler,
34        data::{DexPoolData, block::BlockPosition},
35    },
36    identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40    DataEngine,
41    pool::{
42        PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43    },
44};
45
46/// Extracts the block position tuple from a DexPoolData event.
47fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48    match event {
49        DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50        DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51        DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52        DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53    }
54}
55
56/// Converts buffered DefiData events to DexPoolData and sorts by block position.
57fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58    let mut events: Vec<DexPoolData> = buffered_events
59        .into_iter()
60        .filter_map(|event| match event {
61            DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62            DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63            DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64            DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65            _ => None,
66        })
67        .collect();
68
69    events.sort_by(|a, b| {
70        let pos_a = get_event_block_position(a);
71        let pos_b = get_event_block_position(b);
72        pos_a.cmp(&pos_b)
73    });
74
75    events
76}
77
78impl DataEngine {
79    /// Returns all blockchains for which blocks subscriptions exist.
80    #[must_use]
81    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82        self.collect_subscriptions(|client| &client.subscriptions_blocks)
83    }
84
85    /// Returns all instrument IDs for which pool subscriptions exist.
86    #[must_use]
87    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88        self.collect_subscriptions(|client| &client.subscriptions_pools)
89    }
90
91    /// Returns all instrument IDs for which swap subscriptions exist.
92    #[must_use]
93    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95    }
96
97    /// Returns all instrument IDs for which liquidity update subscriptions exist.
98    #[must_use]
99    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101    }
102
103    /// Returns all instrument IDs for which fee collect subscriptions exist.
104    #[must_use]
105    pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106        self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107    }
108
109    /// Returns all instrument IDs for which flash loan subscriptions exist.
110    #[must_use]
111    pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112        self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113    }
114
115    /// Handles a subscribe command, updating internal state and forwarding to the client.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
120    /// or if the underlying client operation fails.
121    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
122        if let Some(client_id) = cmd.client_id()
123            && self.external_clients.contains(client_id)
124        {
125            if self.config.debug {
126                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
127            }
128            return Ok(());
129        }
130
131        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132            log::info!("Forwarding subscription to client {}", client.client_id);
133            client.execute_defi_subscribe(cmd);
134        } else {
135            log::error!(
136                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137                cmd.client_id(),
138                cmd.venue(),
139            );
140        }
141
142        match cmd {
143            DefiSubscribeCommand::Pool(cmd) => {
144                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145            }
146            DefiSubscribeCommand::PoolSwaps(cmd) => {
147                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148            }
149            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151            }
152            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154            }
155            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157            }
158            DefiSubscribeCommand::Blocks(_) => {} // No pool setup needed for blocks
159        }
160
161        Ok(())
162    }
163
164    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the underlying client operation fails.
169    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170        if let Some(client_id) = cmd.client_id()
171            && self.external_clients.contains(client_id)
172        {
173            if self.config.debug {
174                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
175            }
176            return Ok(());
177        }
178
179        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180            client.execute_defi_unsubscribe(cmd);
181        } else {
182            log::error!(
183                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184                cmd.client_id(),
185                cmd.venue(),
186            );
187        }
188
189        Ok(())
190    }
191
192    /// Sends a [`DefiRequestCommand`] to a suitable data client implementation.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if no client is found for the given client ID or venue,
197    /// or if the client fails to process the request.
198    pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199        // Skip requests for external clients
200        if let Some(cid) = req.client_id()
201            && self.external_clients.contains(cid)
202        {
203            if self.config.debug {
204                log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205            }
206            return Ok(());
207        }
208
209        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210            client.execute_defi_request(req)
211        } else {
212            anyhow::bail!(
213                "Cannot handle request: no client found for {:?} {:?}",
214                req.client_id(),
215                req.venue()
216            );
217        }
218    }
219
220    /// Processes DeFi-specific data events.
221    pub fn process_defi_data(&mut self, data: DefiData) {
222        match data {
223            DefiData::Block(block) => {
224                let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
225                msgbus::publish_defi_block(topic, &block);
226            }
227            DefiData::Pool(pool) => {
228                if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
229                    log::error!("Failed to add Pool to cache: {e}");
230                }
231
232                // Check if pool profiler creation was deferred
233                if self.pool_updaters_pending.remove(&pool.instrument_id) {
234                    log::info!(
235                        "Pool {} now loaded, creating deferred pool profiler",
236                        pool.instrument_id
237                    );
238                    self.setup_pool_updater(&pool.instrument_id, None);
239                }
240
241                let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
242                msgbus::publish_defi_pool(topic, &pool);
243            }
244            DefiData::PoolSnapshot(snapshot) => {
245                let instrument_id = snapshot.instrument_id;
246                log::info!(
247                    "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
248                    snapshot.block_position.number,
249                    snapshot.positions.len(),
250                    snapshot.ticks.len()
251                );
252
253                // Validate we're expecting this snapshot
254                if !self.pool_snapshot_pending.contains(&instrument_id) {
255                    log::warn!(
256                        "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
257                    );
258                    return;
259                }
260
261                // Get pool from cache
262                let pool = match self.cache.borrow().pool(&instrument_id) {
263                    Some(pool) => Arc::new(pool.clone()),
264                    None => {
265                        log::error!(
266                            "Pool {instrument_id} not found in cache when processing snapshot"
267                        );
268                        return;
269                    }
270                };
271
272                // Create profiler and restore from snapshot
273                let mut profiler = PoolProfiler::new(pool);
274                if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
275                    log::error!(
276                        "Failed to restore profiler from snapshot for {instrument_id}: {e}"
277                    );
278                    return;
279                }
280                log::debug!("Restored pool profiler for {instrument_id} from snapshot");
281
282                // Process buffered events
283                let buffered_events = self
284                    .pool_event_buffers
285                    .remove(&instrument_id)
286                    .unwrap_or_default();
287
288                if !buffered_events.is_empty() {
289                    log::info!(
290                        "Processing {} buffered events for {instrument_id}",
291                        buffered_events.len()
292                    );
293
294                    let events_to_apply = convert_and_sort_buffered_events(buffered_events);
295                    let applied_count = Self::apply_buffered_events_to_profiler(
296                        &mut profiler,
297                        events_to_apply,
298                        &snapshot.block_position,
299                        instrument_id,
300                    );
301
302                    log::info!(
303                        "Applied {applied_count} buffered events to profiler for {instrument_id}"
304                    );
305                }
306
307                // Add profiler to cache
308                if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
309                    log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
310                    return;
311                }
312
313                // Create updater and subscribe to topics
314                self.pool_snapshot_pending.remove(&instrument_id);
315                let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
316
317                self.subscribe_pool_updater_topics(instrument_id, updater.clone());
318                self.pool_updaters.insert(instrument_id, updater);
319
320                log::info!(
321                    "Pool profiler setup completed for {instrument_id}, now processing live events"
322                );
323            }
324            DefiData::PoolSwap(swap) => {
325                let instrument_id = swap.instrument_id;
326                // Buffer if waiting for snapshot, otherwise publish
327                if self.pool_snapshot_pending.contains(&instrument_id) {
328                    log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
329                    self.pool_event_buffers
330                        .entry(instrument_id)
331                        .or_default()
332                        .push(DefiData::PoolSwap(swap));
333                } else {
334                    let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
335                    msgbus::publish_defi_swap(topic, &swap);
336                }
337            }
338            DefiData::PoolLiquidityUpdate(update) => {
339                let instrument_id = update.instrument_id;
340                // Buffer if waiting for snapshot, otherwise publish
341                if self.pool_snapshot_pending.contains(&instrument_id) {
342                    log::debug!(
343                        "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
344                    );
345                    self.pool_event_buffers
346                        .entry(instrument_id)
347                        .or_default()
348                        .push(DefiData::PoolLiquidityUpdate(update));
349                } else {
350                    let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
351                    msgbus::publish_defi_liquidity(topic, &update);
352                }
353            }
354            DefiData::PoolFeeCollect(collect) => {
355                let instrument_id = collect.instrument_id;
356                // Buffer if waiting for snapshot, otherwise publish
357                if self.pool_snapshot_pending.contains(&instrument_id) {
358                    log::debug!(
359                        "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
360                    );
361                    self.pool_event_buffers
362                        .entry(instrument_id)
363                        .or_default()
364                        .push(DefiData::PoolFeeCollect(collect));
365                } else {
366                    let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
367                    msgbus::publish_defi_collect(topic, &collect);
368                }
369            }
370            DefiData::PoolFlash(flash) => {
371                let instrument_id = flash.instrument_id;
372                // Buffer if waiting for snapshot, otherwise publish
373                if self.pool_snapshot_pending.contains(&instrument_id) {
374                    log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
375                    self.pool_event_buffers
376                        .entry(instrument_id)
377                        .or_default()
378                        .push(DefiData::PoolFlash(flash));
379                } else {
380                    let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
381                    msgbus::publish_defi_flash(topic, &flash);
382                }
383            }
384        }
385    }
386
387    /// Subscribes a pool updater to all relevant pool data topics using typed handlers.
388    fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
389        let priority = Some(self.msgbus_priority);
390
391        // Subscribe swap handler
392        let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
393        let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
394        msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
395
396        // Subscribe liquidity handler
397        let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
398        let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
399        msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
400
401        // Subscribe collect handler
402        let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
403        let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
404        msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
405
406        // Subscribe flash handler
407        let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
408        let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
409        msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
410    }
411
412    /// Applies buffered events to a pool profiler, filtering to events after the snapshot.
413    ///
414    /// Returns the count of successfully applied events.
415    fn apply_buffered_events_to_profiler(
416        profiler: &mut PoolProfiler,
417        events: Vec<DexPoolData>,
418        snapshot_block: &BlockPosition,
419        instrument_id: InstrumentId,
420    ) -> usize {
421        let mut applied_count = 0;
422
423        for event in events {
424            let event_block = get_event_block_position(&event);
425
426            // Only apply events that occurred after the snapshot
427            let is_after_snapshot = event_block.0 > snapshot_block.number
428                || (event_block.0 == snapshot_block.number
429                    && event_block.1 > snapshot_block.transaction_index)
430                || (event_block.0 == snapshot_block.number
431                    && event_block.1 == snapshot_block.transaction_index
432                    && event_block.2 > snapshot_block.log_index);
433
434            if is_after_snapshot {
435                if let Err(e) = profiler.process(&event) {
436                    log::error!(
437                        "Failed to apply buffered event to profiler for {instrument_id}: {e}"
438                    );
439                } else {
440                    applied_count += 1;
441                }
442            }
443        }
444
445        applied_count
446    }
447
448    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
449        // Early return if updater already exists or we are in the middle of setting it up.
450        if self.pool_updaters.contains_key(instrument_id)
451            || self.pool_updaters_pending.contains(instrument_id)
452        {
453            log::debug!("Pool updater for {instrument_id} already exists");
454            return;
455        }
456
457        log::info!("Setting up pool updater for {instrument_id}");
458
459        // Check cache state and ensure profiler exists
460        {
461            let mut cache = self.cache.borrow_mut();
462
463            if cache.pool_profiler(instrument_id).is_some() {
464                // Profiler already exists, proceed to create updater
465                log::debug!("Pool profiler already exists for {instrument_id}");
466            } else if let Some(pool) = cache.pool(instrument_id) {
467                // Pool exists but no profiler, create profiler from pool
468                let pool = Arc::new(pool.clone());
469                let mut pool_profiler = PoolProfiler::new(pool.clone());
470
471                if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
472                    pool_profiler.initialize(initial_sqrt_price_x96);
473                    log::debug!(
474                        "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
475                    );
476                } else {
477                    log::debug!("Created pool profiler for {instrument_id}");
478                }
479
480                if let Err(e) = cache.add_pool_profiler(pool_profiler) {
481                    log::error!("Failed to add pool profiler for {instrument_id}: {e}");
482                    drop(cache);
483                    return;
484                }
485                drop(cache);
486            } else {
487                // Neither profiler nor pool exists, request snapshot
488                drop(cache);
489
490                let request_id = UUID4::new();
491                let ts_init = self.clock.borrow().timestamp_ns();
492                let request = RequestPoolSnapshot::new(
493                    *instrument_id,
494                    client_id.copied(),
495                    request_id,
496                    ts_init,
497                    None,
498                );
499
500                if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
501                {
502                    log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
503                } else {
504                    log::debug!("Requested pool snapshot for {instrument_id}");
505                    self.pool_snapshot_pending.insert(*instrument_id);
506                    self.pool_updaters_pending.insert(*instrument_id);
507                    self.pool_event_buffers.entry(*instrument_id).or_default();
508                }
509                return;
510            }
511        }
512
513        // Profiler exists, create updater and subscribe to topics
514        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
515
516        self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
517        self.pool_updaters.insert(*instrument_id, updater);
518
519        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use std::sync::Arc;
526
527    use alloy_primitives::{Address, I256, U160, U256};
528    use nautilus_model::{
529        defi::{
530            Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
531            PoolLiquidityUpdateType, PoolSwap,
532            chain::chains,
533            data::DexPoolData,
534            dex::{AmmType, Dex, DexType},
535        },
536        identifiers::{InstrumentId, Symbol, Venue},
537    };
538    use rstest::*;
539
540    use super::*;
541
542    #[fixture]
543    fn test_instrument_id() -> InstrumentId {
544        InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
545    }
546
547    #[fixture]
548    fn test_chain() -> Arc<Chain> {
549        Arc::new(chains::ETHEREUM.clone())
550    }
551
552    #[fixture]
553    fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
554        Arc::new(Dex::new(
555            (*test_chain).clone(),
556            DexType::UniswapV3,
557            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
558            12369621,
559            AmmType::CLAMM,
560            "PoolCreated(address,address,uint24,int24,address)",
561            "Swap(address,address,int256,int256,uint160,uint128,int24)",
562            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
563            "Burn(address,int24,int24,uint128,uint256,uint256)",
564            "Collect(address,address,int24,int24,uint128,uint128)",
565        ))
566    }
567
568    fn create_test_swap(
569        test_instrument_id: InstrumentId,
570        test_chain: Arc<Chain>,
571        test_dex: Arc<Dex>,
572        block: u64,
573        tx_index: u32,
574        log_index: u32,
575    ) -> PoolSwap {
576        PoolSwap::new(
577            test_chain,
578            test_dex,
579            test_instrument_id,
580            PoolIdentifier::from_address(Address::ZERO),
581            block,
582            format!("0x{block:064x}"),
583            tx_index,
584            log_index,
585            None,
586            Address::ZERO,
587            Address::ZERO,
588            I256::ZERO,
589            I256::ZERO,
590            U160::ZERO,
591            0,
592            0,
593        )
594    }
595
596    fn create_test_liquidity_update(
597        test_instrument_id: InstrumentId,
598        test_chain: Arc<Chain>,
599        test_dex: Arc<Dex>,
600        block: u64,
601        tx_index: u32,
602        log_index: u32,
603    ) -> PoolLiquidityUpdate {
604        PoolLiquidityUpdate::new(
605            test_chain,
606            test_dex,
607            test_instrument_id,
608            PoolIdentifier::from_address(Address::ZERO),
609            PoolLiquidityUpdateType::Mint,
610            block,
611            format!("0x{block:064x}"),
612            tx_index,
613            log_index,
614            None,
615            Address::ZERO,
616            0,
617            U256::ZERO,
618            U256::ZERO,
619            0,
620            0,
621            None,
622        )
623    }
624
625    fn create_test_fee_collect(
626        test_instrument_id: InstrumentId,
627        test_chain: Arc<Chain>,
628        test_dex: Arc<Dex>,
629        block: u64,
630        tx_index: u32,
631        log_index: u32,
632    ) -> PoolFeeCollect {
633        PoolFeeCollect::new(
634            test_chain,
635            test_dex,
636            test_instrument_id,
637            PoolIdentifier::from_address(Address::ZERO),
638            block,
639            format!("0x{block:064x}"),
640            tx_index,
641            log_index,
642            Address::ZERO,
643            0,
644            0,
645            0,
646            0,
647            None,
648        )
649    }
650
651    fn create_test_flash(
652        test_instrument_id: InstrumentId,
653        test_chain: Arc<Chain>,
654        test_dex: Arc<Dex>,
655        block: u64,
656        tx_index: u32,
657        log_index: u32,
658    ) -> PoolFlash {
659        PoolFlash::new(
660            test_chain,
661            test_dex,
662            test_instrument_id,
663            PoolIdentifier::from_address(Address::ZERO),
664            block,
665            format!("0x{block:064x}"),
666            tx_index,
667            log_index,
668            None,
669            Address::ZERO,
670            Address::ZERO,
671            U256::ZERO,
672            U256::ZERO,
673            U256::ZERO,
674            U256::ZERO,
675        )
676    }
677
678    #[rstest]
679    fn test_get_event_block_position_swap(
680        test_instrument_id: InstrumentId,
681        test_chain: Arc<Chain>,
682        test_dex: Arc<Dex>,
683    ) {
684        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
685        let pos = get_event_block_position(&DexPoolData::Swap(swap));
686        assert_eq!(pos, (100, 5, 3));
687    }
688
689    #[rstest]
690    fn test_get_event_block_position_liquidity_update(
691        test_instrument_id: InstrumentId,
692        test_chain: Arc<Chain>,
693        test_dex: Arc<Dex>,
694    ) {
695        let update =
696            create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
697        let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
698        assert_eq!(pos, (200, 10, 7));
699    }
700
701    #[rstest]
702    fn test_get_event_block_position_fee_collect(
703        test_instrument_id: InstrumentId,
704        test_chain: Arc<Chain>,
705        test_dex: Arc<Dex>,
706    ) {
707        let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
708        let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
709        assert_eq!(pos, (300, 15, 2));
710    }
711
712    #[rstest]
713    fn test_get_event_block_position_flash(
714        test_instrument_id: InstrumentId,
715        test_chain: Arc<Chain>,
716        test_dex: Arc<Dex>,
717    ) {
718        let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
719        let pos = get_event_block_position(&DexPoolData::Flash(flash));
720        assert_eq!(pos, (400, 20, 8));
721    }
722
723    #[rstest]
724    fn test_convert_and_sort_empty_events() {
725        let events = convert_and_sort_buffered_events(vec![]);
726        assert!(events.is_empty());
727    }
728
729    #[rstest]
730    fn test_convert_and_sort_filters_non_pool_events(
731        test_instrument_id: InstrumentId,
732        test_chain: Arc<Chain>,
733        test_dex: Arc<Dex>,
734    ) {
735        let events = vec![
736            DefiData::PoolSwap(create_test_swap(
737                test_instrument_id,
738                test_chain,
739                test_dex,
740                100,
741                0,
742                0,
743            )),
744            // Block events would be filtered out
745        ];
746        let sorted = convert_and_sort_buffered_events(events);
747        assert_eq!(sorted.len(), 1);
748    }
749
750    #[rstest]
751    fn test_convert_and_sort_single_event(
752        test_instrument_id: InstrumentId,
753        test_chain: Arc<Chain>,
754        test_dex: Arc<Dex>,
755    ) {
756        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
757        let events = vec![DefiData::PoolSwap(swap)];
758        let sorted = convert_and_sort_buffered_events(events);
759        assert_eq!(sorted.len(), 1);
760        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
761    }
762
763    #[rstest]
764    fn test_convert_and_sort_already_sorted(
765        test_instrument_id: InstrumentId,
766        test_chain: Arc<Chain>,
767        test_dex: Arc<Dex>,
768    ) {
769        let events = vec![
770            DefiData::PoolSwap(create_test_swap(
771                test_instrument_id,
772                test_chain.clone(),
773                test_dex.clone(),
774                100,
775                0,
776                0,
777            )),
778            DefiData::PoolSwap(create_test_swap(
779                test_instrument_id,
780                test_chain.clone(),
781                test_dex.clone(),
782                100,
783                0,
784                1,
785            )),
786            DefiData::PoolSwap(create_test_swap(
787                test_instrument_id,
788                test_chain,
789                test_dex,
790                100,
791                1,
792                0,
793            )),
794        ];
795        let sorted = convert_and_sort_buffered_events(events);
796        assert_eq!(sorted.len(), 3);
797        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
798        assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
799        assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
800    }
801
802    #[rstest]
803    fn test_convert_and_sort_reverse_order(
804        test_instrument_id: InstrumentId,
805        test_chain: Arc<Chain>,
806        test_dex: Arc<Dex>,
807    ) {
808        let events = vec![
809            DefiData::PoolSwap(create_test_swap(
810                test_instrument_id,
811                test_chain.clone(),
812                test_dex.clone(),
813                100,
814                2,
815                5,
816            )),
817            DefiData::PoolSwap(create_test_swap(
818                test_instrument_id,
819                test_chain.clone(),
820                test_dex.clone(),
821                100,
822                1,
823                3,
824            )),
825            DefiData::PoolSwap(create_test_swap(
826                test_instrument_id,
827                test_chain,
828                test_dex,
829                100,
830                0,
831                1,
832            )),
833        ];
834        let sorted = convert_and_sort_buffered_events(events);
835        assert_eq!(sorted.len(), 3);
836        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
837        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
838        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
839    }
840
841    #[rstest]
842    fn test_convert_and_sort_mixed_blocks(
843        test_instrument_id: InstrumentId,
844        test_chain: Arc<Chain>,
845        test_dex: Arc<Dex>,
846    ) {
847        let events = vec![
848            DefiData::PoolSwap(create_test_swap(
849                test_instrument_id,
850                test_chain.clone(),
851                test_dex.clone(),
852                102,
853                0,
854                0,
855            )),
856            DefiData::PoolSwap(create_test_swap(
857                test_instrument_id,
858                test_chain.clone(),
859                test_dex.clone(),
860                100,
861                5,
862                2,
863            )),
864            DefiData::PoolSwap(create_test_swap(
865                test_instrument_id,
866                test_chain,
867                test_dex,
868                101,
869                3,
870                1,
871            )),
872        ];
873        let sorted = convert_and_sort_buffered_events(events);
874        assert_eq!(sorted.len(), 3);
875        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
876        assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
877        assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
878    }
879
880    #[rstest]
881    fn test_convert_and_sort_mixed_event_types(
882        test_instrument_id: InstrumentId,
883        test_chain: Arc<Chain>,
884        test_dex: Arc<Dex>,
885    ) {
886        let events = vec![
887            DefiData::PoolSwap(create_test_swap(
888                test_instrument_id,
889                test_chain.clone(),
890                test_dex.clone(),
891                100,
892                2,
893                0,
894            )),
895            DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
896                test_instrument_id,
897                test_chain.clone(),
898                test_dex.clone(),
899                100,
900                0,
901                0,
902            )),
903            DefiData::PoolFeeCollect(create_test_fee_collect(
904                test_instrument_id,
905                test_chain.clone(),
906                test_dex.clone(),
907                100,
908                1,
909                0,
910            )),
911            DefiData::PoolFlash(create_test_flash(
912                test_instrument_id,
913                test_chain,
914                test_dex,
915                100,
916                3,
917                0,
918            )),
919        ];
920        let sorted = convert_and_sort_buffered_events(events);
921        assert_eq!(sorted.len(), 4);
922        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
923        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
924        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
925        assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
926    }
927
928    #[rstest]
929    fn test_convert_and_sort_same_block_and_tx_different_log_index(
930        test_instrument_id: InstrumentId,
931        test_chain: Arc<Chain>,
932        test_dex: Arc<Dex>,
933    ) {
934        let events = vec![
935            DefiData::PoolSwap(create_test_swap(
936                test_instrument_id,
937                test_chain.clone(),
938                test_dex.clone(),
939                100,
940                5,
941                10,
942            )),
943            DefiData::PoolSwap(create_test_swap(
944                test_instrument_id,
945                test_chain.clone(),
946                test_dex.clone(),
947                100,
948                5,
949                5,
950            )),
951            DefiData::PoolSwap(create_test_swap(
952                test_instrument_id,
953                test_chain,
954                test_dex,
955                100,
956                5,
957                1,
958            )),
959        ];
960        let sorted = convert_and_sort_buffered_events(events);
961        assert_eq!(sorted.len(), 3);
962        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
963        assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
964        assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
965    }
966}