nautilus_data/defi/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data engine functionality.
17//!
18//! This module provides DeFi processing methods for the `DataEngine`.
19//! All code in this module requires the `defi` feature flag.
20
21use std::{any::Any, rc::Rc, sync::Arc};
22
23use nautilus_common::{
24    defi,
25    messages::defi::{
26        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27    },
28    msgbus::{self, handler::ShareableMessageHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32    defi::{
33        Blockchain, DefiData, PoolProfiler,
34        data::{DexPoolData, block::BlockPosition},
35    },
36    identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{DataEngine, pool::PoolUpdater};
40
41/// Extracts the block position tuple from a DexPoolData event.
42fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
43    match event {
44        DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
45        DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
46        DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
47        DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
48    }
49}
50
51/// Converts buffered DefiData events to DexPoolData and sorts by block position.
52fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
53    let mut events: Vec<DexPoolData> = buffered_events
54        .into_iter()
55        .filter_map(|event| match event {
56            DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
57            DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
58            DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
59            DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
60            _ => None,
61        })
62        .collect();
63
64    events.sort_by(|a, b| {
65        let pos_a = get_event_block_position(a);
66        let pos_b = get_event_block_position(b);
67        pos_a.cmp(&pos_b)
68    });
69
70    events
71}
72
73impl DataEngine {
74    /// Returns all blockchains for which blocks subscriptions exist.
75    #[must_use]
76    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
77        self.collect_subscriptions(|client| &client.subscriptions_blocks)
78    }
79
80    /// Returns all instrument IDs for which pool subscriptions exist.
81    #[must_use]
82    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
83        self.collect_subscriptions(|client| &client.subscriptions_pools)
84    }
85
86    /// Returns all instrument IDs for which swap subscriptions exist.
87    #[must_use]
88    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
89        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
90    }
91
92    /// Returns all instrument IDs for which liquidity update subscriptions exist.
93    #[must_use]
94    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
95        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
96    }
97
98    /// Returns all instrument IDs for which fee collect subscriptions exist.
99    #[must_use]
100    pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
101        self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
102    }
103
104    /// Returns all instrument IDs for which flash loan subscriptions exist.
105    #[must_use]
106    pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
107        self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
108    }
109
110    /// Handles a subscribe command, updating internal state and forwarding to the client.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
115    /// or if the underlying client operation fails.
116    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
117        if let Some(client_id) = cmd.client_id()
118            && self.external_clients.contains(client_id)
119        {
120            if self.config.debug {
121                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
122            }
123            return Ok(());
124        }
125
126        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
127            log::info!("Forwarding subscription to client {}", client.client_id);
128            client.execute_defi_subscribe(cmd);
129        } else {
130            log::error!(
131                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
132                cmd.client_id(),
133                cmd.venue(),
134            );
135        }
136
137        match cmd {
138            DefiSubscribeCommand::Pool(cmd) => {
139                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
140            }
141            DefiSubscribeCommand::PoolSwaps(cmd) => {
142                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
143            }
144            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
145                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
146            }
147            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
148                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
149            }
150            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
151                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
152            }
153            DefiSubscribeCommand::Blocks(_) => {} // No pool setup needed for blocks
154        }
155
156        Ok(())
157    }
158
159    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the underlying client operation fails.
164    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
165        if let Some(client_id) = cmd.client_id()
166            && self.external_clients.contains(client_id)
167        {
168            if self.config.debug {
169                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
170            }
171            return Ok(());
172        }
173
174        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
175            client.execute_defi_unsubscribe(cmd);
176        } else {
177            log::error!(
178                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
179                cmd.client_id(),
180                cmd.venue(),
181            );
182        }
183
184        Ok(())
185    }
186
187    /// Sends a [`DefiRequestCommand`] to a suitable data client implementation.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if no client is found for the given client ID or venue,
192    /// or if the client fails to process the request.
193    pub fn execute_defi_request(&mut self, req: &DefiRequestCommand) -> anyhow::Result<()> {
194        // Skip requests for external clients
195        if let Some(cid) = req.client_id()
196            && self.external_clients.contains(cid)
197        {
198            if self.config.debug {
199                log::debug!("Skipping defi data request for external client {cid}: {req:?}");
200            }
201            return Ok(());
202        }
203
204        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
205            client.execute_defi_request(req)
206        } else {
207            anyhow::bail!(
208                "Cannot handle request: no client found for {:?} {:?}",
209                req.client_id(),
210                req.venue()
211            );
212        }
213    }
214
215    /// Processes DeFi-specific data events.
216    pub fn process_defi_data(&mut self, data: DefiData) {
217        match data {
218            DefiData::Block(block) => {
219                let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
220                msgbus::publish(topic, &block as &dyn Any);
221            }
222            DefiData::Pool(pool) => {
223                if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
224                    log::error!("Failed to add Pool to cache: {e}");
225                }
226
227                // Check if pool profiler creation was deferred
228                if self.pool_updaters_pending.remove(&pool.instrument_id) {
229                    log::info!(
230                        "Pool {} now loaded, creating deferred pool profiler",
231                        pool.instrument_id
232                    );
233                    self.setup_pool_updater(&pool.instrument_id, None);
234                }
235
236                let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
237                msgbus::publish(topic, &pool as &dyn Any);
238            }
239            DefiData::PoolSnapshot(snapshot) => {
240                let instrument_id = snapshot.instrument_id;
241                log::info!(
242                    "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
243                    snapshot.block_position.number,
244                    snapshot.positions.len(),
245                    snapshot.ticks.len()
246                );
247
248                // Validate we're expecting this snapshot
249                if !self.pool_snapshot_pending.contains(&instrument_id) {
250                    log::warn!(
251                        "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
252                    );
253                    return;
254                }
255
256                // Get pool from cache
257                let pool = match self.cache.borrow().pool(&instrument_id) {
258                    Some(pool) => Arc::new(pool.clone()),
259                    None => {
260                        log::error!(
261                            "Pool {instrument_id} not found in cache when processing snapshot"
262                        );
263                        return;
264                    }
265                };
266
267                // Create profiler and restore from snapshot
268                let mut profiler = PoolProfiler::new(pool);
269                if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
270                    log::error!(
271                        "Failed to restore profiler from snapshot for {instrument_id}: {e}"
272                    );
273                    return;
274                }
275                log::debug!("Restored pool profiler for {instrument_id} from snapshot");
276
277                // Process buffered events
278                let buffered_events = self
279                    .pool_event_buffers
280                    .remove(&instrument_id)
281                    .unwrap_or_default();
282
283                if !buffered_events.is_empty() {
284                    log::info!(
285                        "Processing {} buffered events for {instrument_id}",
286                        buffered_events.len()
287                    );
288
289                    let events_to_apply = convert_and_sort_buffered_events(buffered_events);
290                    let applied_count = Self::apply_buffered_events_to_profiler(
291                        &mut profiler,
292                        events_to_apply,
293                        &snapshot.block_position,
294                        instrument_id,
295                    );
296
297                    log::info!(
298                        "Applied {applied_count} buffered events to profiler for {instrument_id}"
299                    );
300                }
301
302                // Add profiler to cache
303                if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
304                    log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
305                    return;
306                }
307
308                // Create updater and subscribe to topics
309                self.pool_snapshot_pending.remove(&instrument_id);
310                let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
311                let handler = ShareableMessageHandler(updater.clone());
312
313                self.subscribe_pool_updater_topics(instrument_id, handler);
314                self.pool_updaters.insert(instrument_id, updater);
315
316                log::info!(
317                    "Pool profiler setup completed for {instrument_id}, now processing live events"
318                );
319            }
320            DefiData::PoolSwap(swap) => {
321                let instrument_id = swap.instrument_id;
322                // Buffer if waiting for snapshot, otherwise publish
323                if self.pool_snapshot_pending.contains(&instrument_id) {
324                    log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
325                    self.pool_event_buffers
326                        .entry(instrument_id)
327                        .or_default()
328                        .push(DefiData::PoolSwap(swap));
329                } else {
330                    let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
331                    msgbus::publish(topic, &swap as &dyn Any);
332                }
333            }
334            DefiData::PoolLiquidityUpdate(update) => {
335                let instrument_id = update.instrument_id;
336                // Buffer if waiting for snapshot, otherwise publish
337                if self.pool_snapshot_pending.contains(&instrument_id) {
338                    log::debug!(
339                        "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
340                    );
341                    self.pool_event_buffers
342                        .entry(instrument_id)
343                        .or_default()
344                        .push(DefiData::PoolLiquidityUpdate(update));
345                } else {
346                    let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
347                    msgbus::publish(topic, &update as &dyn Any);
348                }
349            }
350            DefiData::PoolFeeCollect(collect) => {
351                let instrument_id = collect.instrument_id;
352                // Buffer if waiting for snapshot, otherwise publish
353                if self.pool_snapshot_pending.contains(&instrument_id) {
354                    log::debug!(
355                        "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
356                    );
357                    self.pool_event_buffers
358                        .entry(instrument_id)
359                        .or_default()
360                        .push(DefiData::PoolFeeCollect(collect));
361                } else {
362                    let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
363                    msgbus::publish(topic, &collect as &dyn Any);
364                }
365            }
366            DefiData::PoolFlash(flash) => {
367                let instrument_id = flash.instrument_id;
368                // Buffer if waiting for snapshot, otherwise publish
369                if self.pool_snapshot_pending.contains(&instrument_id) {
370                    log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
371                    self.pool_event_buffers
372                        .entry(instrument_id)
373                        .or_default()
374                        .push(DefiData::PoolFlash(flash));
375                } else {
376                    let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
377                    msgbus::publish(topic, &flash as &dyn Any);
378                }
379            }
380        }
381    }
382
383    /// Subscribes a pool updater handler to all relevant pool data topics.
384    fn subscribe_pool_updater_topics(
385        &self,
386        instrument_id: InstrumentId,
387        handler: ShareableMessageHandler,
388    ) {
389        let topics = [
390            defi::switchboard::get_defi_pool_swaps_topic(instrument_id),
391            defi::switchboard::get_defi_liquidity_topic(instrument_id),
392            defi::switchboard::get_defi_collect_topic(instrument_id),
393            defi::switchboard::get_defi_flash_topic(instrument_id),
394        ];
395
396        for topic in topics {
397            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
398                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
399            }
400        }
401    }
402
403    /// Applies buffered events to a pool profiler, filtering to events after the snapshot.
404    ///
405    /// Returns the count of successfully applied events.
406    fn apply_buffered_events_to_profiler(
407        profiler: &mut PoolProfiler,
408        events: Vec<DexPoolData>,
409        snapshot_block: &BlockPosition,
410        instrument_id: InstrumentId,
411    ) -> usize {
412        let mut applied_count = 0;
413
414        for event in events {
415            let event_block = get_event_block_position(&event);
416
417            // Only apply events that occurred after the snapshot
418            let is_after_snapshot = event_block.0 > snapshot_block.number
419                || (event_block.0 == snapshot_block.number
420                    && event_block.1 > snapshot_block.transaction_index)
421                || (event_block.0 == snapshot_block.number
422                    && event_block.1 == snapshot_block.transaction_index
423                    && event_block.2 > snapshot_block.log_index);
424
425            if is_after_snapshot {
426                if let Err(e) = profiler.process(&event) {
427                    log::error!(
428                        "Failed to apply buffered event to profiler for {instrument_id}: {e}"
429                    );
430                } else {
431                    applied_count += 1;
432                }
433            }
434        }
435
436        applied_count
437    }
438
439    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
440        // Early return if updater already exists or we are in the middle of setting it up.
441        if self.pool_updaters.contains_key(instrument_id)
442            || self.pool_updaters_pending.contains(instrument_id)
443        {
444            log::debug!("Pool updater for {instrument_id} already exists");
445            return;
446        }
447
448        log::info!("Setting up pool updater for {instrument_id}");
449
450        // Check cache state and ensure profiler exists
451        {
452            let mut cache = self.cache.borrow_mut();
453
454            if cache.pool_profiler(instrument_id).is_some() {
455                // Profiler already exists, proceed to create updater
456                log::debug!("Pool profiler already exists for {instrument_id}");
457            } else if let Some(pool) = cache.pool(instrument_id) {
458                // Pool exists but no profiler, create profiler from pool
459                let pool = Arc::new(pool.clone());
460                let mut pool_profiler = PoolProfiler::new(pool.clone());
461
462                if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
463                    pool_profiler.initialize(initial_sqrt_price_x96);
464                    log::debug!(
465                        "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
466                    );
467                } else {
468                    log::debug!("Created pool profiler for {instrument_id}");
469                }
470
471                if let Err(e) = cache.add_pool_profiler(pool_profiler) {
472                    log::error!("Failed to add pool profiler for {instrument_id}: {e}");
473                    drop(cache);
474                    return;
475                }
476                drop(cache);
477            } else {
478                // Neither profiler nor pool exists, request snapshot
479                drop(cache);
480
481                let request_id = UUID4::new();
482                let ts_init = self.clock.borrow().timestamp_ns();
483                let request = RequestPoolSnapshot::new(
484                    *instrument_id,
485                    client_id.copied(),
486                    request_id,
487                    ts_init,
488                    None,
489                );
490
491                if let Err(e) =
492                    self.execute_defi_request(&DefiRequestCommand::PoolSnapshot(request))
493                {
494                    log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
495                } else {
496                    log::debug!("Requested pool snapshot for {instrument_id}");
497                    self.pool_snapshot_pending.insert(*instrument_id);
498                    self.pool_updaters_pending.insert(*instrument_id);
499                    self.pool_event_buffers.entry(*instrument_id).or_default();
500                }
501                return;
502            }
503        }
504
505        // Profiler exists, create updater and subscribe to topics
506        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
507        let handler = ShareableMessageHandler(updater.clone());
508
509        self.subscribe_pool_updater_topics(*instrument_id, handler);
510        self.pool_updaters.insert(*instrument_id, updater);
511
512        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
513    }
514}
515
516////////////////////////////////////////////////////////////////////////////////
517// Tests
518////////////////////////////////////////////////////////////////////////////////
519
520#[cfg(test)]
521mod tests {
522    use std::sync::Arc;
523
524    use alloy_primitives::{Address, I256, U160, U256};
525    use nautilus_model::{
526        defi::{
527            DefiData, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
528            chain::chains,
529            data::DexPoolData,
530            dex::{AmmType, Dex, DexType},
531        },
532        identifiers::{InstrumentId, Symbol, Venue},
533    };
534    use rstest::*;
535
536    use super::*;
537
538    // Test fixtures
539    #[fixture]
540    fn test_instrument_id() -> InstrumentId {
541        InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
542    }
543
544    #[fixture]
545    fn test_chain() -> Arc<nautilus_model::defi::Chain> {
546        Arc::new(chains::ETHEREUM.clone())
547    }
548
549    #[fixture]
550    fn test_dex(test_chain: Arc<nautilus_model::defi::Chain>) -> Arc<Dex> {
551        Arc::new(Dex::new(
552            (*test_chain).clone(),
553            DexType::UniswapV3,
554            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
555            12369621,
556            AmmType::CLAMM,
557            "PoolCreated(address,address,uint24,int24,address)",
558            "Swap(address,address,int256,int256,uint160,uint128,int24)",
559            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
560            "Burn(address,int24,int24,uint128,uint256,uint256)",
561            "Collect(address,address,int24,int24,uint128,uint128)",
562        ))
563    }
564
565    fn create_test_swap(
566        test_instrument_id: InstrumentId,
567        test_chain: Arc<nautilus_model::defi::Chain>,
568        test_dex: Arc<Dex>,
569        block: u64,
570        tx_index: u32,
571        log_index: u32,
572    ) -> PoolSwap {
573        PoolSwap::new(
574            test_chain,
575            test_dex,
576            test_instrument_id,
577            Address::ZERO,
578            block,
579            format!("0x{:064x}", block),
580            tx_index,
581            log_index,
582            None,
583            Address::ZERO,
584            Address::ZERO,
585            I256::ZERO,
586            I256::ZERO,
587            U160::ZERO,
588            0,
589            0,
590        )
591    }
592
593    fn create_test_liquidity_update(
594        test_instrument_id: InstrumentId,
595        test_chain: Arc<nautilus_model::defi::Chain>,
596        test_dex: Arc<Dex>,
597        block: u64,
598        tx_index: u32,
599        log_index: u32,
600    ) -> PoolLiquidityUpdate {
601        use nautilus_model::defi::PoolLiquidityUpdateType;
602
603        PoolLiquidityUpdate::new(
604            test_chain,
605            test_dex,
606            test_instrument_id,
607            Address::ZERO,
608            PoolLiquidityUpdateType::Mint,
609            block,
610            format!("0x{:064x}", block),
611            tx_index,
612            log_index,
613            None,
614            Address::ZERO,
615            0,
616            U256::ZERO,
617            U256::ZERO,
618            0,
619            0,
620            None,
621        )
622    }
623
624    fn create_test_fee_collect(
625        test_instrument_id: InstrumentId,
626        test_chain: Arc<nautilus_model::defi::Chain>,
627        test_dex: Arc<Dex>,
628        block: u64,
629        tx_index: u32,
630        log_index: u32,
631    ) -> PoolFeeCollect {
632        PoolFeeCollect::new(
633            test_chain,
634            test_dex,
635            test_instrument_id,
636            Address::ZERO,
637            block,
638            format!("0x{:064x}", block),
639            tx_index,
640            log_index,
641            Address::ZERO,
642            0,
643            0,
644            0,
645            0,
646            None,
647        )
648    }
649
650    fn create_test_flash(
651        test_instrument_id: InstrumentId,
652        test_chain: Arc<nautilus_model::defi::Chain>,
653        test_dex: Arc<Dex>,
654        block: u64,
655        tx_index: u32,
656        log_index: u32,
657    ) -> PoolFlash {
658        PoolFlash::new(
659            test_chain,
660            test_dex,
661            test_instrument_id,
662            Address::ZERO,
663            block,
664            format!("0x{:064x}", block),
665            tx_index,
666            log_index,
667            None,
668            Address::ZERO,
669            Address::ZERO,
670            U256::ZERO,
671            U256::ZERO,
672            U256::ZERO,
673            U256::ZERO,
674        )
675    }
676
677    #[rstest]
678    fn test_get_event_block_position_swap(
679        test_instrument_id: InstrumentId,
680        test_chain: Arc<nautilus_model::defi::Chain>,
681        test_dex: Arc<Dex>,
682    ) {
683        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
684        let pos = get_event_block_position(&DexPoolData::Swap(swap));
685        assert_eq!(pos, (100, 5, 3));
686    }
687
688    #[rstest]
689    fn test_get_event_block_position_liquidity_update(
690        test_instrument_id: InstrumentId,
691        test_chain: Arc<nautilus_model::defi::Chain>,
692        test_dex: Arc<Dex>,
693    ) {
694        let update =
695            create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
696        let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
697        assert_eq!(pos, (200, 10, 7));
698    }
699
700    #[rstest]
701    fn test_get_event_block_position_fee_collect(
702        test_instrument_id: InstrumentId,
703        test_chain: Arc<nautilus_model::defi::Chain>,
704        test_dex: Arc<Dex>,
705    ) {
706        let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
707        let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
708        assert_eq!(pos, (300, 15, 2));
709    }
710
711    #[rstest]
712    fn test_get_event_block_position_flash(
713        test_instrument_id: InstrumentId,
714        test_chain: Arc<nautilus_model::defi::Chain>,
715        test_dex: Arc<Dex>,
716    ) {
717        let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
718        let pos = get_event_block_position(&DexPoolData::Flash(flash));
719        assert_eq!(pos, (400, 20, 8));
720    }
721
722    #[rstest]
723    fn test_convert_and_sort_empty_events() {
724        let events = convert_and_sort_buffered_events(vec![]);
725        assert!(events.is_empty());
726    }
727
728    #[rstest]
729    fn test_convert_and_sort_filters_non_pool_events(
730        test_instrument_id: InstrumentId,
731        test_chain: Arc<nautilus_model::defi::Chain>,
732        test_dex: Arc<Dex>,
733    ) {
734        let events = vec![
735            DefiData::PoolSwap(create_test_swap(
736                test_instrument_id,
737                test_chain,
738                test_dex,
739                100,
740                0,
741                0,
742            )),
743            // Block events would be filtered out
744        ];
745        let sorted = convert_and_sort_buffered_events(events);
746        assert_eq!(sorted.len(), 1);
747    }
748
749    #[rstest]
750    fn test_convert_and_sort_single_event(
751        test_instrument_id: InstrumentId,
752        test_chain: Arc<nautilus_model::defi::Chain>,
753        test_dex: Arc<Dex>,
754    ) {
755        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
756        let events = vec![DefiData::PoolSwap(swap)];
757        let sorted = convert_and_sort_buffered_events(events);
758        assert_eq!(sorted.len(), 1);
759        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
760    }
761
762    #[rstest]
763    fn test_convert_and_sort_already_sorted(
764        test_instrument_id: InstrumentId,
765        test_chain: Arc<nautilus_model::defi::Chain>,
766        test_dex: Arc<Dex>,
767    ) {
768        let events = vec![
769            DefiData::PoolSwap(create_test_swap(
770                test_instrument_id,
771                test_chain.clone(),
772                test_dex.clone(),
773                100,
774                0,
775                0,
776            )),
777            DefiData::PoolSwap(create_test_swap(
778                test_instrument_id,
779                test_chain.clone(),
780                test_dex.clone(),
781                100,
782                0,
783                1,
784            )),
785            DefiData::PoolSwap(create_test_swap(
786                test_instrument_id,
787                test_chain,
788                test_dex,
789                100,
790                1,
791                0,
792            )),
793        ];
794        let sorted = convert_and_sort_buffered_events(events);
795        assert_eq!(sorted.len(), 3);
796        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
797        assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
798        assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
799    }
800
801    #[rstest]
802    fn test_convert_and_sort_reverse_order(
803        test_instrument_id: InstrumentId,
804        test_chain: Arc<nautilus_model::defi::Chain>,
805        test_dex: Arc<Dex>,
806    ) {
807        let events = vec![
808            DefiData::PoolSwap(create_test_swap(
809                test_instrument_id,
810                test_chain.clone(),
811                test_dex.clone(),
812                100,
813                2,
814                5,
815            )),
816            DefiData::PoolSwap(create_test_swap(
817                test_instrument_id,
818                test_chain.clone(),
819                test_dex.clone(),
820                100,
821                1,
822                3,
823            )),
824            DefiData::PoolSwap(create_test_swap(
825                test_instrument_id,
826                test_chain,
827                test_dex,
828                100,
829                0,
830                1,
831            )),
832        ];
833        let sorted = convert_and_sort_buffered_events(events);
834        assert_eq!(sorted.len(), 3);
835        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
836        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
837        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
838    }
839
840    #[rstest]
841    fn test_convert_and_sort_mixed_blocks(
842        test_instrument_id: InstrumentId,
843        test_chain: Arc<nautilus_model::defi::Chain>,
844        test_dex: Arc<Dex>,
845    ) {
846        let events = vec![
847            DefiData::PoolSwap(create_test_swap(
848                test_instrument_id,
849                test_chain.clone(),
850                test_dex.clone(),
851                102,
852                0,
853                0,
854            )),
855            DefiData::PoolSwap(create_test_swap(
856                test_instrument_id,
857                test_chain.clone(),
858                test_dex.clone(),
859                100,
860                5,
861                2,
862            )),
863            DefiData::PoolSwap(create_test_swap(
864                test_instrument_id,
865                test_chain,
866                test_dex,
867                101,
868                3,
869                1,
870            )),
871        ];
872        let sorted = convert_and_sort_buffered_events(events);
873        assert_eq!(sorted.len(), 3);
874        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
875        assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
876        assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
877    }
878
879    #[rstest]
880    fn test_convert_and_sort_mixed_event_types(
881        test_instrument_id: InstrumentId,
882        test_chain: Arc<nautilus_model::defi::Chain>,
883        test_dex: Arc<Dex>,
884    ) {
885        let events = vec![
886            DefiData::PoolSwap(create_test_swap(
887                test_instrument_id,
888                test_chain.clone(),
889                test_dex.clone(),
890                100,
891                2,
892                0,
893            )),
894            DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
895                test_instrument_id,
896                test_chain.clone(),
897                test_dex.clone(),
898                100,
899                0,
900                0,
901            )),
902            DefiData::PoolFeeCollect(create_test_fee_collect(
903                test_instrument_id,
904                test_chain.clone(),
905                test_dex.clone(),
906                100,
907                1,
908                0,
909            )),
910            DefiData::PoolFlash(create_test_flash(
911                test_instrument_id,
912                test_chain,
913                test_dex,
914                100,
915                3,
916                0,
917            )),
918        ];
919        let sorted = convert_and_sort_buffered_events(events);
920        assert_eq!(sorted.len(), 4);
921        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
922        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
923        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
924        assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
925    }
926
927    #[rstest]
928    fn test_convert_and_sort_same_block_and_tx_different_log_index(
929        test_instrument_id: InstrumentId,
930        test_chain: Arc<nautilus_model::defi::Chain>,
931        test_dex: Arc<Dex>,
932    ) {
933        let events = vec![
934            DefiData::PoolSwap(create_test_swap(
935                test_instrument_id,
936                test_chain.clone(),
937                test_dex.clone(),
938                100,
939                5,
940                10,
941            )),
942            DefiData::PoolSwap(create_test_swap(
943                test_instrument_id,
944                test_chain.clone(),
945                test_dex.clone(),
946                100,
947                5,
948                5,
949            )),
950            DefiData::PoolSwap(create_test_swap(
951                test_instrument_id,
952                test_chain,
953                test_dex,
954                100,
955                5,
956                1,
957            )),
958        ];
959        let sorted = convert_and_sort_buffered_events(events);
960        assert_eq!(sorted.len(), 3);
961        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
962        assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
963        assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
964    }
965}