1use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24 defi,
25 messages::defi::{
26 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27 },
28 msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 defi::{
33 Blockchain, DefiData, PoolProfiler,
34 data::{DexPoolData, block::BlockPosition},
35 },
36 identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40 DataEngine,
41 pool::{
42 PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43 },
44};
45
46fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48 match event {
49 DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50 DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51 DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52 DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53 }
54}
55
56fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58 let mut events: Vec<DexPoolData> = buffered_events
59 .into_iter()
60 .filter_map(|event| match event {
61 DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62 DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63 DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64 DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65 _ => None,
66 })
67 .collect();
68
69 events.sort_by(|a, b| {
70 let pos_a = get_event_block_position(a);
71 let pos_b = get_event_block_position(b);
72 pos_a.cmp(&pos_b)
73 });
74
75 events
76}
77
78impl DataEngine {
79 #[must_use]
81 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82 self.collect_subscriptions(|client| &client.subscriptions_blocks)
83 }
84
85 #[must_use]
87 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88 self.collect_subscriptions(|client| &client.subscriptions_pools)
89 }
90
91 #[must_use]
93 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95 }
96
97 #[must_use]
99 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101 }
102
103 #[must_use]
105 pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106 self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107 }
108
109 #[must_use]
111 pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112 self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113 }
114
115 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
122 if let Some(client_id) = cmd.client_id()
123 && self.external_clients.contains(client_id)
124 {
125 if self.config.debug {
126 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
127 }
128 return Ok(());
129 }
130
131 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132 log::info!("Forwarding subscription to client {}", client.client_id);
133 client.execute_defi_subscribe(cmd);
134 } else {
135 log::error!(
136 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137 cmd.client_id(),
138 cmd.venue(),
139 );
140 }
141
142 match cmd {
143 DefiSubscribeCommand::Pool(cmd) => {
144 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145 }
146 DefiSubscribeCommand::PoolSwaps(cmd) => {
147 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148 }
149 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151 }
152 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154 }
155 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157 }
158 DefiSubscribeCommand::Blocks(_) => {} }
160
161 Ok(())
162 }
163
164 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170 if let Some(client_id) = cmd.client_id()
171 && self.external_clients.contains(client_id)
172 {
173 if self.config.debug {
174 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
175 }
176 return Ok(());
177 }
178
179 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180 client.execute_defi_unsubscribe(cmd);
181 } else {
182 log::error!(
183 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184 cmd.client_id(),
185 cmd.venue(),
186 );
187 }
188
189 Ok(())
190 }
191
192 pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199 if let Some(cid) = req.client_id()
201 && self.external_clients.contains(cid)
202 {
203 if self.config.debug {
204 log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205 }
206 return Ok(());
207 }
208
209 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210 client.execute_defi_request(req)
211 } else {
212 anyhow::bail!(
213 "Cannot handle request: no client found for {:?} {:?}",
214 req.client_id(),
215 req.venue()
216 );
217 }
218 }
219
220 pub fn process_defi_data(&mut self, data: DefiData) {
222 match data {
223 DefiData::Block(block) => {
224 let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
225 msgbus::publish_defi_block(topic, &block);
226 }
227 DefiData::Pool(pool) => {
228 if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
229 log::error!("Failed to add Pool to cache: {e}");
230 }
231
232 if self.pool_updaters_pending.remove(&pool.instrument_id) {
234 log::info!(
235 "Pool {} now loaded, creating deferred pool profiler",
236 pool.instrument_id
237 );
238 self.setup_pool_updater(&pool.instrument_id, None);
239 }
240
241 let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
242 msgbus::publish_defi_pool(topic, &pool);
243 }
244 DefiData::PoolSnapshot(snapshot) => {
245 let instrument_id = snapshot.instrument_id;
246 log::info!(
247 "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
248 snapshot.block_position.number,
249 snapshot.positions.len(),
250 snapshot.ticks.len()
251 );
252
253 if !self.pool_snapshot_pending.contains(&instrument_id) {
255 log::warn!(
256 "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
257 );
258 return;
259 }
260
261 let pool = match self.cache.borrow().pool(&instrument_id) {
263 Some(pool) => Arc::new(pool.clone()),
264 None => {
265 log::error!(
266 "Pool {instrument_id} not found in cache when processing snapshot"
267 );
268 return;
269 }
270 };
271
272 let mut profiler = PoolProfiler::new(pool);
274 if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
275 log::error!(
276 "Failed to restore profiler from snapshot for {instrument_id}: {e}"
277 );
278 return;
279 }
280 log::debug!("Restored pool profiler for {instrument_id} from snapshot");
281
282 let buffered_events = self
284 .pool_event_buffers
285 .remove(&instrument_id)
286 .unwrap_or_default();
287
288 if !buffered_events.is_empty() {
289 log::info!(
290 "Processing {} buffered events for {instrument_id}",
291 buffered_events.len()
292 );
293
294 let events_to_apply = convert_and_sort_buffered_events(buffered_events);
295 let applied_count = Self::apply_buffered_events_to_profiler(
296 &mut profiler,
297 events_to_apply,
298 &snapshot.block_position,
299 instrument_id,
300 );
301
302 log::info!(
303 "Applied {applied_count} buffered events to profiler for {instrument_id}"
304 );
305 }
306
307 if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
309 log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
310 return;
311 }
312
313 self.pool_snapshot_pending.remove(&instrument_id);
315 let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
316
317 self.subscribe_pool_updater_topics(instrument_id, updater.clone());
318 self.pool_updaters.insert(instrument_id, updater);
319
320 log::info!(
321 "Pool profiler setup completed for {instrument_id}, now processing live events"
322 );
323 }
324 DefiData::PoolSwap(swap) => {
325 let instrument_id = swap.instrument_id;
326 if self.pool_snapshot_pending.contains(&instrument_id) {
328 log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
329 self.pool_event_buffers
330 .entry(instrument_id)
331 .or_default()
332 .push(DefiData::PoolSwap(swap));
333 } else {
334 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
335 msgbus::publish_defi_swap(topic, &swap);
336 }
337 }
338 DefiData::PoolLiquidityUpdate(update) => {
339 let instrument_id = update.instrument_id;
340 if self.pool_snapshot_pending.contains(&instrument_id) {
342 log::debug!(
343 "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
344 );
345 self.pool_event_buffers
346 .entry(instrument_id)
347 .or_default()
348 .push(DefiData::PoolLiquidityUpdate(update));
349 } else {
350 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
351 msgbus::publish_defi_liquidity(topic, &update);
352 }
353 }
354 DefiData::PoolFeeCollect(collect) => {
355 let instrument_id = collect.instrument_id;
356 if self.pool_snapshot_pending.contains(&instrument_id) {
358 log::debug!(
359 "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
360 );
361 self.pool_event_buffers
362 .entry(instrument_id)
363 .or_default()
364 .push(DefiData::PoolFeeCollect(collect));
365 } else {
366 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
367 msgbus::publish_defi_collect(topic, &collect);
368 }
369 }
370 DefiData::PoolFlash(flash) => {
371 let instrument_id = flash.instrument_id;
372 if self.pool_snapshot_pending.contains(&instrument_id) {
374 log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
375 self.pool_event_buffers
376 .entry(instrument_id)
377 .or_default()
378 .push(DefiData::PoolFlash(flash));
379 } else {
380 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
381 msgbus::publish_defi_flash(topic, &flash);
382 }
383 }
384 }
385 }
386
387 fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
389 let priority = Some(self.msgbus_priority);
390
391 let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
393 let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
394 msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
395
396 let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
398 let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
399 msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
400
401 let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
403 let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
404 msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
405
406 let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
408 let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
409 msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
410 }
411
412 fn apply_buffered_events_to_profiler(
416 profiler: &mut PoolProfiler,
417 events: Vec<DexPoolData>,
418 snapshot_block: &BlockPosition,
419 instrument_id: InstrumentId,
420 ) -> usize {
421 let mut applied_count = 0;
422
423 for event in events {
424 let event_block = get_event_block_position(&event);
425
426 let is_after_snapshot = event_block.0 > snapshot_block.number
428 || (event_block.0 == snapshot_block.number
429 && event_block.1 > snapshot_block.transaction_index)
430 || (event_block.0 == snapshot_block.number
431 && event_block.1 == snapshot_block.transaction_index
432 && event_block.2 > snapshot_block.log_index);
433
434 if is_after_snapshot {
435 if let Err(e) = profiler.process(&event) {
436 log::error!(
437 "Failed to apply buffered event to profiler for {instrument_id}: {e}"
438 );
439 } else {
440 applied_count += 1;
441 }
442 }
443 }
444
445 applied_count
446 }
447
448 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
449 if self.pool_updaters.contains_key(instrument_id)
451 || self.pool_updaters_pending.contains(instrument_id)
452 {
453 log::debug!("Pool updater for {instrument_id} already exists");
454 return;
455 }
456
457 log::info!("Setting up pool updater for {instrument_id}");
458
459 {
461 let mut cache = self.cache.borrow_mut();
462
463 if cache.pool_profiler(instrument_id).is_some() {
464 log::debug!("Pool profiler already exists for {instrument_id}");
466 } else if let Some(pool) = cache.pool(instrument_id) {
467 let pool = Arc::new(pool.clone());
469 let mut pool_profiler = PoolProfiler::new(pool.clone());
470
471 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
472 pool_profiler.initialize(initial_sqrt_price_x96);
473 log::debug!(
474 "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
475 );
476 } else {
477 log::debug!("Created pool profiler for {instrument_id}");
478 }
479
480 if let Err(e) = cache.add_pool_profiler(pool_profiler) {
481 log::error!("Failed to add pool profiler for {instrument_id}: {e}");
482 drop(cache);
483 return;
484 }
485 drop(cache);
486 } else {
487 drop(cache);
489
490 let request_id = UUID4::new();
491 let ts_init = self.clock.borrow().timestamp_ns();
492 let request = RequestPoolSnapshot::new(
493 *instrument_id,
494 client_id.copied(),
495 request_id,
496 ts_init,
497 None,
498 );
499
500 if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
501 {
502 log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
503 } else {
504 log::debug!("Requested pool snapshot for {instrument_id}");
505 self.pool_snapshot_pending.insert(*instrument_id);
506 self.pool_updaters_pending.insert(*instrument_id);
507 self.pool_event_buffers.entry(*instrument_id).or_default();
508 }
509 return;
510 }
511 }
512
513 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
515
516 self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
517 self.pool_updaters.insert(*instrument_id, updater);
518
519 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use std::sync::Arc;
526
527 use alloy_primitives::{Address, I256, U160, U256};
528 use nautilus_model::{
529 defi::{
530 Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
531 PoolLiquidityUpdateType, PoolSwap,
532 chain::chains,
533 data::DexPoolData,
534 dex::{AmmType, Dex, DexType},
535 },
536 identifiers::{InstrumentId, Symbol, Venue},
537 };
538 use rstest::*;
539
540 use super::*;
541
542 #[fixture]
543 fn test_instrument_id() -> InstrumentId {
544 InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
545 }
546
547 #[fixture]
548 fn test_chain() -> Arc<Chain> {
549 Arc::new(chains::ETHEREUM.clone())
550 }
551
552 #[fixture]
553 fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
554 Arc::new(Dex::new(
555 (*test_chain).clone(),
556 DexType::UniswapV3,
557 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
558 12369621,
559 AmmType::CLAMM,
560 "PoolCreated(address,address,uint24,int24,address)",
561 "Swap(address,address,int256,int256,uint160,uint128,int24)",
562 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
563 "Burn(address,int24,int24,uint128,uint256,uint256)",
564 "Collect(address,address,int24,int24,uint128,uint128)",
565 ))
566 }
567
568 fn create_test_swap(
569 test_instrument_id: InstrumentId,
570 test_chain: Arc<Chain>,
571 test_dex: Arc<Dex>,
572 block: u64,
573 tx_index: u32,
574 log_index: u32,
575 ) -> PoolSwap {
576 PoolSwap::new(
577 test_chain,
578 test_dex,
579 test_instrument_id,
580 PoolIdentifier::from_address(Address::ZERO),
581 block,
582 format!("0x{block:064x}"),
583 tx_index,
584 log_index,
585 None,
586 Address::ZERO,
587 Address::ZERO,
588 I256::ZERO,
589 I256::ZERO,
590 U160::ZERO,
591 0,
592 0,
593 )
594 }
595
596 fn create_test_liquidity_update(
597 test_instrument_id: InstrumentId,
598 test_chain: Arc<Chain>,
599 test_dex: Arc<Dex>,
600 block: u64,
601 tx_index: u32,
602 log_index: u32,
603 ) -> PoolLiquidityUpdate {
604 PoolLiquidityUpdate::new(
605 test_chain,
606 test_dex,
607 test_instrument_id,
608 PoolIdentifier::from_address(Address::ZERO),
609 PoolLiquidityUpdateType::Mint,
610 block,
611 format!("0x{block:064x}"),
612 tx_index,
613 log_index,
614 None,
615 Address::ZERO,
616 0,
617 U256::ZERO,
618 U256::ZERO,
619 0,
620 0,
621 None,
622 )
623 }
624
625 fn create_test_fee_collect(
626 test_instrument_id: InstrumentId,
627 test_chain: Arc<Chain>,
628 test_dex: Arc<Dex>,
629 block: u64,
630 tx_index: u32,
631 log_index: u32,
632 ) -> PoolFeeCollect {
633 PoolFeeCollect::new(
634 test_chain,
635 test_dex,
636 test_instrument_id,
637 PoolIdentifier::from_address(Address::ZERO),
638 block,
639 format!("0x{block:064x}"),
640 tx_index,
641 log_index,
642 Address::ZERO,
643 0,
644 0,
645 0,
646 0,
647 None,
648 )
649 }
650
651 fn create_test_flash(
652 test_instrument_id: InstrumentId,
653 test_chain: Arc<Chain>,
654 test_dex: Arc<Dex>,
655 block: u64,
656 tx_index: u32,
657 log_index: u32,
658 ) -> PoolFlash {
659 PoolFlash::new(
660 test_chain,
661 test_dex,
662 test_instrument_id,
663 PoolIdentifier::from_address(Address::ZERO),
664 block,
665 format!("0x{block:064x}"),
666 tx_index,
667 log_index,
668 None,
669 Address::ZERO,
670 Address::ZERO,
671 U256::ZERO,
672 U256::ZERO,
673 U256::ZERO,
674 U256::ZERO,
675 )
676 }
677
678 #[rstest]
679 fn test_get_event_block_position_swap(
680 test_instrument_id: InstrumentId,
681 test_chain: Arc<Chain>,
682 test_dex: Arc<Dex>,
683 ) {
684 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
685 let pos = get_event_block_position(&DexPoolData::Swap(swap));
686 assert_eq!(pos, (100, 5, 3));
687 }
688
689 #[rstest]
690 fn test_get_event_block_position_liquidity_update(
691 test_instrument_id: InstrumentId,
692 test_chain: Arc<Chain>,
693 test_dex: Arc<Dex>,
694 ) {
695 let update =
696 create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
697 let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
698 assert_eq!(pos, (200, 10, 7));
699 }
700
701 #[rstest]
702 fn test_get_event_block_position_fee_collect(
703 test_instrument_id: InstrumentId,
704 test_chain: Arc<Chain>,
705 test_dex: Arc<Dex>,
706 ) {
707 let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
708 let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
709 assert_eq!(pos, (300, 15, 2));
710 }
711
712 #[rstest]
713 fn test_get_event_block_position_flash(
714 test_instrument_id: InstrumentId,
715 test_chain: Arc<Chain>,
716 test_dex: Arc<Dex>,
717 ) {
718 let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
719 let pos = get_event_block_position(&DexPoolData::Flash(flash));
720 assert_eq!(pos, (400, 20, 8));
721 }
722
723 #[rstest]
724 fn test_convert_and_sort_empty_events() {
725 let events = convert_and_sort_buffered_events(vec![]);
726 assert!(events.is_empty());
727 }
728
729 #[rstest]
730 fn test_convert_and_sort_filters_non_pool_events(
731 test_instrument_id: InstrumentId,
732 test_chain: Arc<Chain>,
733 test_dex: Arc<Dex>,
734 ) {
735 let events = vec![
736 DefiData::PoolSwap(create_test_swap(
737 test_instrument_id,
738 test_chain,
739 test_dex,
740 100,
741 0,
742 0,
743 )),
744 ];
746 let sorted = convert_and_sort_buffered_events(events);
747 assert_eq!(sorted.len(), 1);
748 }
749
750 #[rstest]
751 fn test_convert_and_sort_single_event(
752 test_instrument_id: InstrumentId,
753 test_chain: Arc<Chain>,
754 test_dex: Arc<Dex>,
755 ) {
756 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
757 let events = vec![DefiData::PoolSwap(swap)];
758 let sorted = convert_and_sort_buffered_events(events);
759 assert_eq!(sorted.len(), 1);
760 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
761 }
762
763 #[rstest]
764 fn test_convert_and_sort_already_sorted(
765 test_instrument_id: InstrumentId,
766 test_chain: Arc<Chain>,
767 test_dex: Arc<Dex>,
768 ) {
769 let events = vec![
770 DefiData::PoolSwap(create_test_swap(
771 test_instrument_id,
772 test_chain.clone(),
773 test_dex.clone(),
774 100,
775 0,
776 0,
777 )),
778 DefiData::PoolSwap(create_test_swap(
779 test_instrument_id,
780 test_chain.clone(),
781 test_dex.clone(),
782 100,
783 0,
784 1,
785 )),
786 DefiData::PoolSwap(create_test_swap(
787 test_instrument_id,
788 test_chain,
789 test_dex,
790 100,
791 1,
792 0,
793 )),
794 ];
795 let sorted = convert_and_sort_buffered_events(events);
796 assert_eq!(sorted.len(), 3);
797 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
798 assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
799 assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
800 }
801
802 #[rstest]
803 fn test_convert_and_sort_reverse_order(
804 test_instrument_id: InstrumentId,
805 test_chain: Arc<Chain>,
806 test_dex: Arc<Dex>,
807 ) {
808 let events = vec![
809 DefiData::PoolSwap(create_test_swap(
810 test_instrument_id,
811 test_chain.clone(),
812 test_dex.clone(),
813 100,
814 2,
815 5,
816 )),
817 DefiData::PoolSwap(create_test_swap(
818 test_instrument_id,
819 test_chain.clone(),
820 test_dex.clone(),
821 100,
822 1,
823 3,
824 )),
825 DefiData::PoolSwap(create_test_swap(
826 test_instrument_id,
827 test_chain,
828 test_dex,
829 100,
830 0,
831 1,
832 )),
833 ];
834 let sorted = convert_and_sort_buffered_events(events);
835 assert_eq!(sorted.len(), 3);
836 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
837 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
838 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
839 }
840
841 #[rstest]
842 fn test_convert_and_sort_mixed_blocks(
843 test_instrument_id: InstrumentId,
844 test_chain: Arc<Chain>,
845 test_dex: Arc<Dex>,
846 ) {
847 let events = vec![
848 DefiData::PoolSwap(create_test_swap(
849 test_instrument_id,
850 test_chain.clone(),
851 test_dex.clone(),
852 102,
853 0,
854 0,
855 )),
856 DefiData::PoolSwap(create_test_swap(
857 test_instrument_id,
858 test_chain.clone(),
859 test_dex.clone(),
860 100,
861 5,
862 2,
863 )),
864 DefiData::PoolSwap(create_test_swap(
865 test_instrument_id,
866 test_chain,
867 test_dex,
868 101,
869 3,
870 1,
871 )),
872 ];
873 let sorted = convert_and_sort_buffered_events(events);
874 assert_eq!(sorted.len(), 3);
875 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
876 assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
877 assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
878 }
879
880 #[rstest]
881 fn test_convert_and_sort_mixed_event_types(
882 test_instrument_id: InstrumentId,
883 test_chain: Arc<Chain>,
884 test_dex: Arc<Dex>,
885 ) {
886 let events = vec![
887 DefiData::PoolSwap(create_test_swap(
888 test_instrument_id,
889 test_chain.clone(),
890 test_dex.clone(),
891 100,
892 2,
893 0,
894 )),
895 DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
896 test_instrument_id,
897 test_chain.clone(),
898 test_dex.clone(),
899 100,
900 0,
901 0,
902 )),
903 DefiData::PoolFeeCollect(create_test_fee_collect(
904 test_instrument_id,
905 test_chain.clone(),
906 test_dex.clone(),
907 100,
908 1,
909 0,
910 )),
911 DefiData::PoolFlash(create_test_flash(
912 test_instrument_id,
913 test_chain,
914 test_dex,
915 100,
916 3,
917 0,
918 )),
919 ];
920 let sorted = convert_and_sort_buffered_events(events);
921 assert_eq!(sorted.len(), 4);
922 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
923 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
924 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
925 assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
926 }
927
928 #[rstest]
929 fn test_convert_and_sort_same_block_and_tx_different_log_index(
930 test_instrument_id: InstrumentId,
931 test_chain: Arc<Chain>,
932 test_dex: Arc<Dex>,
933 ) {
934 let events = vec![
935 DefiData::PoolSwap(create_test_swap(
936 test_instrument_id,
937 test_chain.clone(),
938 test_dex.clone(),
939 100,
940 5,
941 10,
942 )),
943 DefiData::PoolSwap(create_test_swap(
944 test_instrument_id,
945 test_chain.clone(),
946 test_dex.clone(),
947 100,
948 5,
949 5,
950 )),
951 DefiData::PoolSwap(create_test_swap(
952 test_instrument_id,
953 test_chain,
954 test_dex,
955 100,
956 5,
957 1,
958 )),
959 ];
960 let sorted = convert_and_sort_buffered_events(events);
961 assert_eq!(sorted.len(), 3);
962 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
963 assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
964 assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
965 }
966}