1use std::{any::Any, rc::Rc, sync::Arc};
22
23use nautilus_common::{
24 defi,
25 messages::defi::{
26 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27 },
28 msgbus::{self, handler::ShareableMessageHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 defi::{
33 Blockchain, DefiData, PoolProfiler,
34 data::{DexPoolData, block::BlockPosition},
35 },
36 identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{DataEngine, pool::PoolUpdater};
40
41fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
43 match event {
44 DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
45 DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
46 DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
47 DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
48 }
49}
50
51fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
53 let mut events: Vec<DexPoolData> = buffered_events
54 .into_iter()
55 .filter_map(|event| match event {
56 DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
57 DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
58 DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
59 DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
60 _ => None,
61 })
62 .collect();
63
64 events.sort_by(|a, b| {
65 let pos_a = get_event_block_position(a);
66 let pos_b = get_event_block_position(b);
67 pos_a.cmp(&pos_b)
68 });
69
70 events
71}
72
73impl DataEngine {
74 #[must_use]
76 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
77 self.collect_subscriptions(|client| &client.subscriptions_blocks)
78 }
79
80 #[must_use]
82 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
83 self.collect_subscriptions(|client| &client.subscriptions_pools)
84 }
85
86 #[must_use]
88 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
89 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
90 }
91
92 #[must_use]
94 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
95 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
96 }
97
98 #[must_use]
100 pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
101 self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
102 }
103
104 #[must_use]
106 pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
107 self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
108 }
109
110 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
117 if let Some(client_id) = cmd.client_id()
118 && self.external_clients.contains(client_id)
119 {
120 if self.config.debug {
121 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
122 }
123 return Ok(());
124 }
125
126 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
127 log::info!("Forwarding subscription to client {}", client.client_id);
128 client.execute_defi_subscribe(cmd);
129 } else {
130 log::error!(
131 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
132 cmd.client_id(),
133 cmd.venue(),
134 );
135 }
136
137 match cmd {
138 DefiSubscribeCommand::Pool(cmd) => {
139 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
140 }
141 DefiSubscribeCommand::PoolSwaps(cmd) => {
142 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
143 }
144 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
145 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
146 }
147 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
148 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
149 }
150 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
151 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
152 }
153 DefiSubscribeCommand::Blocks(_) => {} }
155
156 Ok(())
157 }
158
159 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
165 if let Some(client_id) = cmd.client_id()
166 && self.external_clients.contains(client_id)
167 {
168 if self.config.debug {
169 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
170 }
171 return Ok(());
172 }
173
174 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
175 client.execute_defi_unsubscribe(cmd);
176 } else {
177 log::error!(
178 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
179 cmd.client_id(),
180 cmd.venue(),
181 );
182 }
183
184 Ok(())
185 }
186
187 pub fn execute_defi_request(&mut self, req: &DefiRequestCommand) -> anyhow::Result<()> {
194 if let Some(cid) = req.client_id()
196 && self.external_clients.contains(cid)
197 {
198 if self.config.debug {
199 log::debug!("Skipping defi data request for external client {cid}: {req:?}");
200 }
201 return Ok(());
202 }
203
204 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
205 client.execute_defi_request(req)
206 } else {
207 anyhow::bail!(
208 "Cannot handle request: no client found for {:?} {:?}",
209 req.client_id(),
210 req.venue()
211 );
212 }
213 }
214
215 pub fn process_defi_data(&mut self, data: DefiData) {
217 match data {
218 DefiData::Block(block) => {
219 let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
220 msgbus::publish(topic, &block as &dyn Any);
221 }
222 DefiData::Pool(pool) => {
223 if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
224 log::error!("Failed to add Pool to cache: {e}");
225 }
226
227 if self.pool_updaters_pending.remove(&pool.instrument_id) {
229 log::info!(
230 "Pool {} now loaded, creating deferred pool profiler",
231 pool.instrument_id
232 );
233 self.setup_pool_updater(&pool.instrument_id, None);
234 }
235
236 let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
237 msgbus::publish(topic, &pool as &dyn Any);
238 }
239 DefiData::PoolSnapshot(snapshot) => {
240 let instrument_id = snapshot.instrument_id;
241 log::info!(
242 "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
243 snapshot.block_position.number,
244 snapshot.positions.len(),
245 snapshot.ticks.len()
246 );
247
248 if !self.pool_snapshot_pending.contains(&instrument_id) {
250 log::warn!(
251 "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
252 );
253 return;
254 }
255
256 let pool = match self.cache.borrow().pool(&instrument_id) {
258 Some(pool) => Arc::new(pool.clone()),
259 None => {
260 log::error!(
261 "Pool {instrument_id} not found in cache when processing snapshot"
262 );
263 return;
264 }
265 };
266
267 let mut profiler = PoolProfiler::new(pool);
269 if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
270 log::error!(
271 "Failed to restore profiler from snapshot for {instrument_id}: {e}"
272 );
273 return;
274 }
275 log::debug!("Restored pool profiler for {instrument_id} from snapshot");
276
277 let buffered_events = self
279 .pool_event_buffers
280 .remove(&instrument_id)
281 .unwrap_or_default();
282
283 if !buffered_events.is_empty() {
284 log::info!(
285 "Processing {} buffered events for {instrument_id}",
286 buffered_events.len()
287 );
288
289 let events_to_apply = convert_and_sort_buffered_events(buffered_events);
290 let applied_count = Self::apply_buffered_events_to_profiler(
291 &mut profiler,
292 events_to_apply,
293 &snapshot.block_position,
294 instrument_id,
295 );
296
297 log::info!(
298 "Applied {applied_count} buffered events to profiler for {instrument_id}"
299 );
300 }
301
302 if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
304 log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
305 return;
306 }
307
308 self.pool_snapshot_pending.remove(&instrument_id);
310 let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
311 let handler = ShareableMessageHandler(updater.clone());
312
313 self.subscribe_pool_updater_topics(instrument_id, handler);
314 self.pool_updaters.insert(instrument_id, updater);
315
316 log::info!(
317 "Pool profiler setup completed for {instrument_id}, now processing live events"
318 );
319 }
320 DefiData::PoolSwap(swap) => {
321 let instrument_id = swap.instrument_id;
322 if self.pool_snapshot_pending.contains(&instrument_id) {
324 log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
325 self.pool_event_buffers
326 .entry(instrument_id)
327 .or_default()
328 .push(DefiData::PoolSwap(swap));
329 } else {
330 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
331 msgbus::publish(topic, &swap as &dyn Any);
332 }
333 }
334 DefiData::PoolLiquidityUpdate(update) => {
335 let instrument_id = update.instrument_id;
336 if self.pool_snapshot_pending.contains(&instrument_id) {
338 log::debug!(
339 "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
340 );
341 self.pool_event_buffers
342 .entry(instrument_id)
343 .or_default()
344 .push(DefiData::PoolLiquidityUpdate(update));
345 } else {
346 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
347 msgbus::publish(topic, &update as &dyn Any);
348 }
349 }
350 DefiData::PoolFeeCollect(collect) => {
351 let instrument_id = collect.instrument_id;
352 if self.pool_snapshot_pending.contains(&instrument_id) {
354 log::debug!(
355 "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
356 );
357 self.pool_event_buffers
358 .entry(instrument_id)
359 .or_default()
360 .push(DefiData::PoolFeeCollect(collect));
361 } else {
362 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
363 msgbus::publish(topic, &collect as &dyn Any);
364 }
365 }
366 DefiData::PoolFlash(flash) => {
367 let instrument_id = flash.instrument_id;
368 if self.pool_snapshot_pending.contains(&instrument_id) {
370 log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
371 self.pool_event_buffers
372 .entry(instrument_id)
373 .or_default()
374 .push(DefiData::PoolFlash(flash));
375 } else {
376 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
377 msgbus::publish(topic, &flash as &dyn Any);
378 }
379 }
380 }
381 }
382
383 fn subscribe_pool_updater_topics(
385 &self,
386 instrument_id: InstrumentId,
387 handler: ShareableMessageHandler,
388 ) {
389 let topics = [
390 defi::switchboard::get_defi_pool_swaps_topic(instrument_id),
391 defi::switchboard::get_defi_liquidity_topic(instrument_id),
392 defi::switchboard::get_defi_collect_topic(instrument_id),
393 defi::switchboard::get_defi_flash_topic(instrument_id),
394 ];
395
396 for topic in topics {
397 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
398 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
399 }
400 }
401 }
402
403 fn apply_buffered_events_to_profiler(
407 profiler: &mut PoolProfiler,
408 events: Vec<DexPoolData>,
409 snapshot_block: &BlockPosition,
410 instrument_id: InstrumentId,
411 ) -> usize {
412 let mut applied_count = 0;
413
414 for event in events {
415 let event_block = get_event_block_position(&event);
416
417 let is_after_snapshot = event_block.0 > snapshot_block.number
419 || (event_block.0 == snapshot_block.number
420 && event_block.1 > snapshot_block.transaction_index)
421 || (event_block.0 == snapshot_block.number
422 && event_block.1 == snapshot_block.transaction_index
423 && event_block.2 > snapshot_block.log_index);
424
425 if is_after_snapshot {
426 if let Err(e) = profiler.process(&event) {
427 log::error!(
428 "Failed to apply buffered event to profiler for {instrument_id}: {e}"
429 );
430 } else {
431 applied_count += 1;
432 }
433 }
434 }
435
436 applied_count
437 }
438
439 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
440 if self.pool_updaters.contains_key(instrument_id)
442 || self.pool_updaters_pending.contains(instrument_id)
443 {
444 log::debug!("Pool updater for {instrument_id} already exists");
445 return;
446 }
447
448 log::info!("Setting up pool updater for {instrument_id}");
449
450 {
452 let mut cache = self.cache.borrow_mut();
453
454 if cache.pool_profiler(instrument_id).is_some() {
455 log::debug!("Pool profiler already exists for {instrument_id}");
457 } else if let Some(pool) = cache.pool(instrument_id) {
458 let pool = Arc::new(pool.clone());
460 let mut pool_profiler = PoolProfiler::new(pool.clone());
461
462 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
463 pool_profiler.initialize(initial_sqrt_price_x96);
464 log::debug!(
465 "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
466 );
467 } else {
468 log::debug!("Created pool profiler for {instrument_id}");
469 }
470
471 if let Err(e) = cache.add_pool_profiler(pool_profiler) {
472 log::error!("Failed to add pool profiler for {instrument_id}: {e}");
473 drop(cache);
474 return;
475 }
476 drop(cache);
477 } else {
478 drop(cache);
480
481 let request_id = UUID4::new();
482 let ts_init = self.clock.borrow().timestamp_ns();
483 let request = RequestPoolSnapshot::new(
484 *instrument_id,
485 client_id.copied(),
486 request_id,
487 ts_init,
488 None,
489 );
490
491 if let Err(e) =
492 self.execute_defi_request(&DefiRequestCommand::PoolSnapshot(request))
493 {
494 log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
495 } else {
496 log::debug!("Requested pool snapshot for {instrument_id}");
497 self.pool_snapshot_pending.insert(*instrument_id);
498 self.pool_updaters_pending.insert(*instrument_id);
499 self.pool_event_buffers.entry(*instrument_id).or_default();
500 }
501 return;
502 }
503 }
504
505 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
507 let handler = ShareableMessageHandler(updater.clone());
508
509 self.subscribe_pool_updater_topics(*instrument_id, handler);
510 self.pool_updaters.insert(*instrument_id, updater);
511
512 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
513 }
514}
515
516#[cfg(test)]
521mod tests {
522 use std::sync::Arc;
523
524 use alloy_primitives::{Address, I256, U160, U256};
525 use nautilus_model::{
526 defi::{
527 DefiData, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
528 chain::chains,
529 data::DexPoolData,
530 dex::{AmmType, Dex, DexType},
531 },
532 identifiers::{InstrumentId, Symbol, Venue},
533 };
534 use rstest::*;
535
536 use super::*;
537
538 #[fixture]
540 fn test_instrument_id() -> InstrumentId {
541 InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
542 }
543
544 #[fixture]
545 fn test_chain() -> Arc<nautilus_model::defi::Chain> {
546 Arc::new(chains::ETHEREUM.clone())
547 }
548
549 #[fixture]
550 fn test_dex(test_chain: Arc<nautilus_model::defi::Chain>) -> Arc<Dex> {
551 Arc::new(Dex::new(
552 (*test_chain).clone(),
553 DexType::UniswapV3,
554 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
555 12369621,
556 AmmType::CLAMM,
557 "PoolCreated(address,address,uint24,int24,address)",
558 "Swap(address,address,int256,int256,uint160,uint128,int24)",
559 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
560 "Burn(address,int24,int24,uint128,uint256,uint256)",
561 "Collect(address,address,int24,int24,uint128,uint128)",
562 ))
563 }
564
565 fn create_test_swap(
566 test_instrument_id: InstrumentId,
567 test_chain: Arc<nautilus_model::defi::Chain>,
568 test_dex: Arc<Dex>,
569 block: u64,
570 tx_index: u32,
571 log_index: u32,
572 ) -> PoolSwap {
573 PoolSwap::new(
574 test_chain,
575 test_dex,
576 test_instrument_id,
577 Address::ZERO,
578 block,
579 format!("0x{:064x}", block),
580 tx_index,
581 log_index,
582 None,
583 Address::ZERO,
584 Address::ZERO,
585 I256::ZERO,
586 I256::ZERO,
587 U160::ZERO,
588 0,
589 0,
590 )
591 }
592
593 fn create_test_liquidity_update(
594 test_instrument_id: InstrumentId,
595 test_chain: Arc<nautilus_model::defi::Chain>,
596 test_dex: Arc<Dex>,
597 block: u64,
598 tx_index: u32,
599 log_index: u32,
600 ) -> PoolLiquidityUpdate {
601 use nautilus_model::defi::PoolLiquidityUpdateType;
602
603 PoolLiquidityUpdate::new(
604 test_chain,
605 test_dex,
606 test_instrument_id,
607 Address::ZERO,
608 PoolLiquidityUpdateType::Mint,
609 block,
610 format!("0x{:064x}", block),
611 tx_index,
612 log_index,
613 None,
614 Address::ZERO,
615 0,
616 U256::ZERO,
617 U256::ZERO,
618 0,
619 0,
620 None,
621 )
622 }
623
624 fn create_test_fee_collect(
625 test_instrument_id: InstrumentId,
626 test_chain: Arc<nautilus_model::defi::Chain>,
627 test_dex: Arc<Dex>,
628 block: u64,
629 tx_index: u32,
630 log_index: u32,
631 ) -> PoolFeeCollect {
632 PoolFeeCollect::new(
633 test_chain,
634 test_dex,
635 test_instrument_id,
636 Address::ZERO,
637 block,
638 format!("0x{:064x}", block),
639 tx_index,
640 log_index,
641 Address::ZERO,
642 0,
643 0,
644 0,
645 0,
646 None,
647 )
648 }
649
650 fn create_test_flash(
651 test_instrument_id: InstrumentId,
652 test_chain: Arc<nautilus_model::defi::Chain>,
653 test_dex: Arc<Dex>,
654 block: u64,
655 tx_index: u32,
656 log_index: u32,
657 ) -> PoolFlash {
658 PoolFlash::new(
659 test_chain,
660 test_dex,
661 test_instrument_id,
662 Address::ZERO,
663 block,
664 format!("0x{:064x}", block),
665 tx_index,
666 log_index,
667 None,
668 Address::ZERO,
669 Address::ZERO,
670 U256::ZERO,
671 U256::ZERO,
672 U256::ZERO,
673 U256::ZERO,
674 )
675 }
676
677 #[rstest]
678 fn test_get_event_block_position_swap(
679 test_instrument_id: InstrumentId,
680 test_chain: Arc<nautilus_model::defi::Chain>,
681 test_dex: Arc<Dex>,
682 ) {
683 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
684 let pos = get_event_block_position(&DexPoolData::Swap(swap));
685 assert_eq!(pos, (100, 5, 3));
686 }
687
688 #[rstest]
689 fn test_get_event_block_position_liquidity_update(
690 test_instrument_id: InstrumentId,
691 test_chain: Arc<nautilus_model::defi::Chain>,
692 test_dex: Arc<Dex>,
693 ) {
694 let update =
695 create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
696 let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
697 assert_eq!(pos, (200, 10, 7));
698 }
699
700 #[rstest]
701 fn test_get_event_block_position_fee_collect(
702 test_instrument_id: InstrumentId,
703 test_chain: Arc<nautilus_model::defi::Chain>,
704 test_dex: Arc<Dex>,
705 ) {
706 let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
707 let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
708 assert_eq!(pos, (300, 15, 2));
709 }
710
711 #[rstest]
712 fn test_get_event_block_position_flash(
713 test_instrument_id: InstrumentId,
714 test_chain: Arc<nautilus_model::defi::Chain>,
715 test_dex: Arc<Dex>,
716 ) {
717 let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
718 let pos = get_event_block_position(&DexPoolData::Flash(flash));
719 assert_eq!(pos, (400, 20, 8));
720 }
721
722 #[rstest]
723 fn test_convert_and_sort_empty_events() {
724 let events = convert_and_sort_buffered_events(vec![]);
725 assert!(events.is_empty());
726 }
727
728 #[rstest]
729 fn test_convert_and_sort_filters_non_pool_events(
730 test_instrument_id: InstrumentId,
731 test_chain: Arc<nautilus_model::defi::Chain>,
732 test_dex: Arc<Dex>,
733 ) {
734 let events = vec![
735 DefiData::PoolSwap(create_test_swap(
736 test_instrument_id,
737 test_chain,
738 test_dex,
739 100,
740 0,
741 0,
742 )),
743 ];
745 let sorted = convert_and_sort_buffered_events(events);
746 assert_eq!(sorted.len(), 1);
747 }
748
749 #[rstest]
750 fn test_convert_and_sort_single_event(
751 test_instrument_id: InstrumentId,
752 test_chain: Arc<nautilus_model::defi::Chain>,
753 test_dex: Arc<Dex>,
754 ) {
755 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
756 let events = vec![DefiData::PoolSwap(swap)];
757 let sorted = convert_and_sort_buffered_events(events);
758 assert_eq!(sorted.len(), 1);
759 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
760 }
761
762 #[rstest]
763 fn test_convert_and_sort_already_sorted(
764 test_instrument_id: InstrumentId,
765 test_chain: Arc<nautilus_model::defi::Chain>,
766 test_dex: Arc<Dex>,
767 ) {
768 let events = vec![
769 DefiData::PoolSwap(create_test_swap(
770 test_instrument_id,
771 test_chain.clone(),
772 test_dex.clone(),
773 100,
774 0,
775 0,
776 )),
777 DefiData::PoolSwap(create_test_swap(
778 test_instrument_id,
779 test_chain.clone(),
780 test_dex.clone(),
781 100,
782 0,
783 1,
784 )),
785 DefiData::PoolSwap(create_test_swap(
786 test_instrument_id,
787 test_chain,
788 test_dex,
789 100,
790 1,
791 0,
792 )),
793 ];
794 let sorted = convert_and_sort_buffered_events(events);
795 assert_eq!(sorted.len(), 3);
796 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
797 assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
798 assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
799 }
800
801 #[rstest]
802 fn test_convert_and_sort_reverse_order(
803 test_instrument_id: InstrumentId,
804 test_chain: Arc<nautilus_model::defi::Chain>,
805 test_dex: Arc<Dex>,
806 ) {
807 let events = vec![
808 DefiData::PoolSwap(create_test_swap(
809 test_instrument_id,
810 test_chain.clone(),
811 test_dex.clone(),
812 100,
813 2,
814 5,
815 )),
816 DefiData::PoolSwap(create_test_swap(
817 test_instrument_id,
818 test_chain.clone(),
819 test_dex.clone(),
820 100,
821 1,
822 3,
823 )),
824 DefiData::PoolSwap(create_test_swap(
825 test_instrument_id,
826 test_chain,
827 test_dex,
828 100,
829 0,
830 1,
831 )),
832 ];
833 let sorted = convert_and_sort_buffered_events(events);
834 assert_eq!(sorted.len(), 3);
835 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
836 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
837 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
838 }
839
840 #[rstest]
841 fn test_convert_and_sort_mixed_blocks(
842 test_instrument_id: InstrumentId,
843 test_chain: Arc<nautilus_model::defi::Chain>,
844 test_dex: Arc<Dex>,
845 ) {
846 let events = vec![
847 DefiData::PoolSwap(create_test_swap(
848 test_instrument_id,
849 test_chain.clone(),
850 test_dex.clone(),
851 102,
852 0,
853 0,
854 )),
855 DefiData::PoolSwap(create_test_swap(
856 test_instrument_id,
857 test_chain.clone(),
858 test_dex.clone(),
859 100,
860 5,
861 2,
862 )),
863 DefiData::PoolSwap(create_test_swap(
864 test_instrument_id,
865 test_chain,
866 test_dex,
867 101,
868 3,
869 1,
870 )),
871 ];
872 let sorted = convert_and_sort_buffered_events(events);
873 assert_eq!(sorted.len(), 3);
874 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
875 assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
876 assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
877 }
878
879 #[rstest]
880 fn test_convert_and_sort_mixed_event_types(
881 test_instrument_id: InstrumentId,
882 test_chain: Arc<nautilus_model::defi::Chain>,
883 test_dex: Arc<Dex>,
884 ) {
885 let events = vec![
886 DefiData::PoolSwap(create_test_swap(
887 test_instrument_id,
888 test_chain.clone(),
889 test_dex.clone(),
890 100,
891 2,
892 0,
893 )),
894 DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
895 test_instrument_id,
896 test_chain.clone(),
897 test_dex.clone(),
898 100,
899 0,
900 0,
901 )),
902 DefiData::PoolFeeCollect(create_test_fee_collect(
903 test_instrument_id,
904 test_chain.clone(),
905 test_dex.clone(),
906 100,
907 1,
908 0,
909 )),
910 DefiData::PoolFlash(create_test_flash(
911 test_instrument_id,
912 test_chain,
913 test_dex,
914 100,
915 3,
916 0,
917 )),
918 ];
919 let sorted = convert_and_sort_buffered_events(events);
920 assert_eq!(sorted.len(), 4);
921 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
922 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
923 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
924 assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
925 }
926
927 #[rstest]
928 fn test_convert_and_sort_same_block_and_tx_different_log_index(
929 test_instrument_id: InstrumentId,
930 test_chain: Arc<nautilus_model::defi::Chain>,
931 test_dex: Arc<Dex>,
932 ) {
933 let events = vec![
934 DefiData::PoolSwap(create_test_swap(
935 test_instrument_id,
936 test_chain.clone(),
937 test_dex.clone(),
938 100,
939 5,
940 10,
941 )),
942 DefiData::PoolSwap(create_test_swap(
943 test_instrument_id,
944 test_chain.clone(),
945 test_dex.clone(),
946 100,
947 5,
948 5,
949 )),
950 DefiData::PoolSwap(create_test_swap(
951 test_instrument_id,
952 test_chain,
953 test_dex,
954 100,
955 5,
956 1,
957 )),
958 ];
959 let sorted = convert_and_sort_buffered_events(events);
960 assert_eq!(sorted.len(), 3);
961 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
962 assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
963 assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
964 }
965}