1use std::{any::Any, rc::Rc, sync::Arc};
22
23use nautilus_common::{
24 defi,
25 messages::defi::{
26 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27 },
28 msgbus::{self, handler::ShareableMessageHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 defi::{
33 Blockchain, DefiData, PoolProfiler,
34 data::{DexPoolData, block::BlockPosition},
35 },
36 identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{DataEngine, pool::PoolUpdater};
40
41fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
43 match event {
44 DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
45 DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
46 DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
47 DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
48 }
49}
50
51fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
53 let mut events: Vec<DexPoolData> = buffered_events
54 .into_iter()
55 .filter_map(|event| match event {
56 DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
57 DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
58 DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
59 DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
60 _ => None,
61 })
62 .collect();
63
64 events.sort_by(|a, b| {
65 let pos_a = get_event_block_position(a);
66 let pos_b = get_event_block_position(b);
67 pos_a.cmp(&pos_b)
68 });
69
70 events
71}
72
73impl DataEngine {
74 #[must_use]
76 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
77 self.collect_subscriptions(|client| &client.subscriptions_blocks)
78 }
79
80 #[must_use]
82 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
83 self.collect_subscriptions(|client| &client.subscriptions_pools)
84 }
85
86 #[must_use]
88 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
89 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
90 }
91
92 #[must_use]
94 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
95 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
96 }
97
98 #[must_use]
100 pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
101 self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
102 }
103
104 #[must_use]
106 pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
107 self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
108 }
109
110 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
117 if let Some(client_id) = cmd.client_id()
118 && self.external_clients.contains(client_id)
119 {
120 if self.config.debug {
121 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
122 }
123 return Ok(());
124 }
125
126 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
127 log::info!("Forwarding subscription to client {:?}", cmd.client_id());
128 client.execute_defi_subscribe(cmd);
129 } else {
130 log::error!(
131 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
132 cmd.client_id(),
133 cmd.venue(),
134 );
135 }
136
137 match cmd {
138 DefiSubscribeCommand::Pool(cmd) => {
139 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref())
140 }
141 DefiSubscribeCommand::PoolSwaps(cmd) => {
142 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref())
143 }
144 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
145 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref())
146 }
147 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
148 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref())
149 }
150 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
151 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref())
152 }
153 DefiSubscribeCommand::Blocks(_) => {} }
155
156 Ok(())
157 }
158
159 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
165 if let Some(client_id) = cmd.client_id()
166 && self.external_clients.contains(client_id)
167 {
168 if self.config.debug {
169 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
170 }
171 return Ok(());
172 }
173
174 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
175 client.execute_defi_unsubscribe(cmd);
176 } else {
177 log::error!(
178 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
179 cmd.client_id(),
180 cmd.venue(),
181 );
182 }
183
184 Ok(())
185 }
186
187 pub fn execute_defi_request(&mut self, req: &DefiRequestCommand) -> anyhow::Result<()> {
194 if let Some(cid) = req.client_id()
196 && self.external_clients.contains(cid)
197 {
198 if self.config.debug {
199 log::debug!("Skipping defi data request for external client {cid}: {req:?}");
200 }
201 return Ok(());
202 }
203
204 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
205 client.execute_defi_request(req)
206 } else {
207 anyhow::bail!(
208 "Cannot handle request: no client found for {:?} {:?}",
209 req.client_id(),
210 req.venue()
211 );
212 }
213 }
214
215 pub fn process_defi_data(&mut self, data: DefiData) {
217 match data {
218 DefiData::Block(block) => {
219 let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
220 msgbus::publish(topic, &block as &dyn Any);
221 }
222 DefiData::Pool(pool) => {
223 if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
224 log::error!("Failed to add Pool to cache: {e}");
225 }
226
227 if self.pool_updaters_pending.remove(&pool.instrument_id) {
229 log::info!(
230 "Pool {} now loaded, creating deferred pool profiler",
231 pool.instrument_id
232 );
233 self.setup_pool_updater(&pool.instrument_id, None);
234 }
235
236 let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
237 msgbus::publish(topic, &pool as &dyn Any);
238 }
239 DefiData::PoolSnapshot(snapshot) => {
240 let instrument_id = snapshot.instrument_id;
241 log::info!(
242 "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
243 snapshot.block_position.number,
244 snapshot.positions.len(),
245 snapshot.ticks.len()
246 );
247
248 if !self.pool_snapshot_pending.contains(&instrument_id) {
250 log::warn!(
251 "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
252 );
253 return;
254 }
255
256 let pool = match self.cache.borrow().pool(&instrument_id) {
258 Some(pool) => Arc::new(pool.clone()),
259 None => {
260 log::error!(
261 "Pool {instrument_id} not found in cache when processing snapshot"
262 );
263 return;
264 }
265 };
266
267 let mut profiler = PoolProfiler::new(pool);
269 if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
270 log::error!(
271 "Failed to restore profiler from snapshot for {instrument_id}: {e}"
272 );
273 return;
274 }
275 log::debug!("Restored pool profiler for {instrument_id} from snapshot");
276
277 let buffered_events = self
279 .pool_event_buffers
280 .remove(&instrument_id)
281 .unwrap_or_default();
282
283 if !buffered_events.is_empty() {
284 log::info!(
285 "Processing {} buffered events for {instrument_id}",
286 buffered_events.len()
287 );
288
289 let events_to_apply = convert_and_sort_buffered_events(buffered_events);
290 let applied_count = Self::apply_buffered_events_to_profiler(
291 &mut profiler,
292 events_to_apply,
293 &snapshot.block_position,
294 instrument_id,
295 );
296
297 log::info!(
298 "Applied {applied_count} buffered events to profiler for {instrument_id}"
299 );
300 }
301
302 if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
304 log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
305 return;
306 }
307
308 self.pool_snapshot_pending.remove(&instrument_id);
310 let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
311 let handler = ShareableMessageHandler(updater.clone());
312
313 self.subscribe_pool_updater_topics(instrument_id, handler);
314 self.pool_updaters.insert(instrument_id, updater);
315
316 log::info!(
317 "Pool profiler setup completed for {instrument_id}, now processing live events"
318 );
319 }
320 DefiData::PoolSwap(swap) => {
321 let instrument_id = swap.instrument_id;
322 if self.pool_snapshot_pending.contains(&instrument_id) {
324 log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
325 self.pool_event_buffers
326 .entry(instrument_id)
327 .or_default()
328 .push(DefiData::PoolSwap(swap));
329 } else {
330 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
331 msgbus::publish(topic, &swap as &dyn Any);
332 }
333 }
334 DefiData::PoolLiquidityUpdate(update) => {
335 let instrument_id = update.instrument_id;
336 if self.pool_snapshot_pending.contains(&instrument_id) {
338 log::debug!(
339 "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
340 );
341 self.pool_event_buffers
342 .entry(instrument_id)
343 .or_default()
344 .push(DefiData::PoolLiquidityUpdate(update));
345 } else {
346 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
347 msgbus::publish(topic, &update as &dyn Any);
348 }
349 }
350 DefiData::PoolFeeCollect(collect) => {
351 let instrument_id = collect.instrument_id;
352 if self.pool_snapshot_pending.contains(&instrument_id) {
354 log::debug!(
355 "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
356 );
357 self.pool_event_buffers
358 .entry(instrument_id)
359 .or_default()
360 .push(DefiData::PoolFeeCollect(collect));
361 } else {
362 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
363 msgbus::publish(topic, &collect as &dyn Any);
364 }
365 }
366 DefiData::PoolFlash(flash) => {
367 let instrument_id = flash.instrument_id;
368 if self.pool_snapshot_pending.contains(&instrument_id) {
370 log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
371 self.pool_event_buffers
372 .entry(instrument_id)
373 .or_default()
374 .push(DefiData::PoolFlash(flash));
375 } else {
376 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
377 msgbus::publish(topic, &flash as &dyn Any);
378 }
379 }
380 }
381 }
382
383 fn subscribe_pool_updater_topics(
385 &self,
386 instrument_id: InstrumentId,
387 handler: ShareableMessageHandler,
388 ) {
389 let topics = [
390 defi::switchboard::get_defi_pool_swaps_topic(instrument_id),
391 defi::switchboard::get_defi_liquidity_topic(instrument_id),
392 defi::switchboard::get_defi_collect_topic(instrument_id),
393 defi::switchboard::get_defi_flash_topic(instrument_id),
394 ];
395
396 for topic in topics {
397 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
398 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
399 }
400 }
401 }
402
403 fn apply_buffered_events_to_profiler(
407 profiler: &mut PoolProfiler,
408 events: Vec<DexPoolData>,
409 snapshot_block: &BlockPosition,
410 instrument_id: InstrumentId,
411 ) -> usize {
412 let mut applied_count = 0;
413
414 for event in events {
415 let event_block = get_event_block_position(&event);
416
417 let is_after_snapshot = event_block.0 > snapshot_block.number
419 || (event_block.0 == snapshot_block.number
420 && event_block.1 > snapshot_block.transaction_index)
421 || (event_block.0 == snapshot_block.number
422 && event_block.1 == snapshot_block.transaction_index
423 && event_block.2 > snapshot_block.log_index);
424
425 if is_after_snapshot {
426 if let Err(e) = profiler.process(&event) {
427 log::error!(
428 "Failed to apply buffered event to profiler for {instrument_id}: {e}"
429 );
430 } else {
431 applied_count += 1;
432 }
433 }
434 }
435
436 applied_count
437 }
438
439 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
440 if self.pool_updaters.contains_key(instrument_id) {
442 log::debug!("Pool updater for {instrument_id} already exists");
443 return;
444 }
445
446 log::info!("Setting up pool updater for {instrument_id}");
447
448 {
450 let mut cache = self.cache.borrow_mut();
451
452 if cache.pool_profiler(instrument_id).is_some() {
453 log::debug!("Pool profiler already exists for {instrument_id}");
455 } else if let Some(pool) = cache.pool(instrument_id) {
456 let pool = Arc::new(pool.clone());
458 let mut pool_profiler = PoolProfiler::new(pool.clone());
459
460 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
461 pool_profiler.initialize(initial_sqrt_price_x96);
462 log::debug!(
463 "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
464 );
465 } else {
466 log::debug!("Created pool profiler for {instrument_id}");
467 }
468
469 if let Err(e) = cache.add_pool_profiler(pool_profiler) {
470 log::error!("Failed to add pool profiler for {instrument_id}: {e}");
471 return;
472 }
473 } else {
474 drop(cache);
476
477 let request_id = UUID4::new();
478 let ts_init = self.clock.borrow().timestamp_ns();
479 let request = RequestPoolSnapshot::new(
480 *instrument_id,
481 client_id.copied(),
482 request_id,
483 ts_init,
484 None,
485 );
486
487 if let Err(e) =
488 self.execute_defi_request(&DefiRequestCommand::PoolSnapshot(request))
489 {
490 log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
491 self.pool_updaters_pending.insert(*instrument_id);
492 } else {
493 log::debug!("Requested pool snapshot for {instrument_id}");
494 self.pool_snapshot_pending.insert(*instrument_id);
495 self.pool_event_buffers.entry(*instrument_id).or_default();
496 }
497 return;
498 }
499 }
500
501 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
503 let handler = ShareableMessageHandler(updater.clone());
504
505 self.subscribe_pool_updater_topics(*instrument_id, handler);
506 self.pool_updaters.insert(*instrument_id, updater);
507
508 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
509 }
510}
511
512#[cfg(test)]
517mod tests {
518 use std::sync::Arc;
519
520 use alloy_primitives::{Address, I256, U160, U256};
521 use nautilus_model::{
522 defi::{
523 DefiData, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
524 chain::chains,
525 data::DexPoolData,
526 dex::{AmmType, Dex, DexType},
527 },
528 identifiers::{InstrumentId, Symbol, Venue},
529 };
530 use rstest::*;
531
532 use super::*;
533
534 #[fixture]
536 fn test_instrument_id() -> InstrumentId {
537 InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
538 }
539
540 #[fixture]
541 fn test_chain() -> Arc<nautilus_model::defi::Chain> {
542 Arc::new(chains::ETHEREUM.clone())
543 }
544
545 #[fixture]
546 fn test_dex(test_chain: Arc<nautilus_model::defi::Chain>) -> Arc<Dex> {
547 Arc::new(Dex::new(
548 (*test_chain).clone(),
549 DexType::UniswapV3,
550 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
551 12369621,
552 AmmType::CLAMM,
553 "PoolCreated(address,address,uint24,int24,address)",
554 "Swap(address,address,int256,int256,uint160,uint128,int24)",
555 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
556 "Burn(address,int24,int24,uint128,uint256,uint256)",
557 "Collect(address,address,int24,int24,uint128,uint128)",
558 ))
559 }
560
561 fn create_test_swap(
562 test_instrument_id: InstrumentId,
563 test_chain: Arc<nautilus_model::defi::Chain>,
564 test_dex: Arc<Dex>,
565 block: u64,
566 tx_index: u32,
567 log_index: u32,
568 ) -> PoolSwap {
569 PoolSwap::new(
570 test_chain,
571 test_dex,
572 test_instrument_id,
573 Address::ZERO,
574 block,
575 format!("0x{:064x}", block),
576 tx_index,
577 log_index,
578 None,
579 Address::ZERO,
580 Address::ZERO,
581 I256::ZERO,
582 I256::ZERO,
583 U160::ZERO,
584 0,
585 0,
586 None,
587 None,
588 None,
589 )
590 }
591
592 fn create_test_liquidity_update(
593 test_instrument_id: InstrumentId,
594 test_chain: Arc<nautilus_model::defi::Chain>,
595 test_dex: Arc<Dex>,
596 block: u64,
597 tx_index: u32,
598 log_index: u32,
599 ) -> PoolLiquidityUpdate {
600 use nautilus_model::defi::PoolLiquidityUpdateType;
601
602 PoolLiquidityUpdate::new(
603 test_chain,
604 test_dex,
605 test_instrument_id,
606 Address::ZERO,
607 PoolLiquidityUpdateType::Mint,
608 block,
609 format!("0x{:064x}", block),
610 tx_index,
611 log_index,
612 None,
613 Address::ZERO,
614 0,
615 U256::ZERO,
616 U256::ZERO,
617 0,
618 0,
619 None,
620 )
621 }
622
623 fn create_test_fee_collect(
624 test_instrument_id: InstrumentId,
625 test_chain: Arc<nautilus_model::defi::Chain>,
626 test_dex: Arc<Dex>,
627 block: u64,
628 tx_index: u32,
629 log_index: u32,
630 ) -> PoolFeeCollect {
631 PoolFeeCollect::new(
632 test_chain,
633 test_dex,
634 test_instrument_id,
635 Address::ZERO,
636 block,
637 format!("0x{:064x}", block),
638 tx_index,
639 log_index,
640 Address::ZERO,
641 0,
642 0,
643 0,
644 0,
645 None,
646 )
647 }
648
649 fn create_test_flash(
650 test_instrument_id: InstrumentId,
651 test_chain: Arc<nautilus_model::defi::Chain>,
652 test_dex: Arc<Dex>,
653 block: u64,
654 tx_index: u32,
655 log_index: u32,
656 ) -> PoolFlash {
657 PoolFlash::new(
658 test_chain,
659 test_dex,
660 test_instrument_id,
661 Address::ZERO,
662 block,
663 format!("0x{:064x}", block),
664 tx_index,
665 log_index,
666 None,
667 Address::ZERO,
668 Address::ZERO,
669 U256::ZERO,
670 U256::ZERO,
671 U256::ZERO,
672 U256::ZERO,
673 )
674 }
675
676 #[rstest]
677 fn test_get_event_block_position_swap(
678 test_instrument_id: InstrumentId,
679 test_chain: Arc<nautilus_model::defi::Chain>,
680 test_dex: Arc<Dex>,
681 ) {
682 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
683 let pos = get_event_block_position(&DexPoolData::Swap(swap));
684 assert_eq!(pos, (100, 5, 3));
685 }
686
687 #[rstest]
688 fn test_get_event_block_position_liquidity_update(
689 test_instrument_id: InstrumentId,
690 test_chain: Arc<nautilus_model::defi::Chain>,
691 test_dex: Arc<Dex>,
692 ) {
693 let update =
694 create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
695 let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
696 assert_eq!(pos, (200, 10, 7));
697 }
698
699 #[rstest]
700 fn test_get_event_block_position_fee_collect(
701 test_instrument_id: InstrumentId,
702 test_chain: Arc<nautilus_model::defi::Chain>,
703 test_dex: Arc<Dex>,
704 ) {
705 let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
706 let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
707 assert_eq!(pos, (300, 15, 2));
708 }
709
710 #[rstest]
711 fn test_get_event_block_position_flash(
712 test_instrument_id: InstrumentId,
713 test_chain: Arc<nautilus_model::defi::Chain>,
714 test_dex: Arc<Dex>,
715 ) {
716 let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
717 let pos = get_event_block_position(&DexPoolData::Flash(flash));
718 assert_eq!(pos, (400, 20, 8));
719 }
720
721 #[rstest]
722 fn test_convert_and_sort_empty_events() {
723 let events = convert_and_sort_buffered_events(vec![]);
724 assert!(events.is_empty());
725 }
726
727 #[rstest]
728 fn test_convert_and_sort_filters_non_pool_events(
729 test_instrument_id: InstrumentId,
730 test_chain: Arc<nautilus_model::defi::Chain>,
731 test_dex: Arc<Dex>,
732 ) {
733 let events = vec![
734 DefiData::PoolSwap(create_test_swap(
735 test_instrument_id,
736 test_chain,
737 test_dex,
738 100,
739 0,
740 0,
741 )),
742 ];
744 let sorted = convert_and_sort_buffered_events(events);
745 assert_eq!(sorted.len(), 1);
746 }
747
748 #[rstest]
749 fn test_convert_and_sort_single_event(
750 test_instrument_id: InstrumentId,
751 test_chain: Arc<nautilus_model::defi::Chain>,
752 test_dex: Arc<Dex>,
753 ) {
754 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
755 let events = vec![DefiData::PoolSwap(swap)];
756 let sorted = convert_and_sort_buffered_events(events);
757 assert_eq!(sorted.len(), 1);
758 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
759 }
760
761 #[rstest]
762 fn test_convert_and_sort_already_sorted(
763 test_instrument_id: InstrumentId,
764 test_chain: Arc<nautilus_model::defi::Chain>,
765 test_dex: Arc<Dex>,
766 ) {
767 let events = vec![
768 DefiData::PoolSwap(create_test_swap(
769 test_instrument_id,
770 test_chain.clone(),
771 test_dex.clone(),
772 100,
773 0,
774 0,
775 )),
776 DefiData::PoolSwap(create_test_swap(
777 test_instrument_id,
778 test_chain.clone(),
779 test_dex.clone(),
780 100,
781 0,
782 1,
783 )),
784 DefiData::PoolSwap(create_test_swap(
785 test_instrument_id,
786 test_chain,
787 test_dex,
788 100,
789 1,
790 0,
791 )),
792 ];
793 let sorted = convert_and_sort_buffered_events(events);
794 assert_eq!(sorted.len(), 3);
795 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
796 assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
797 assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
798 }
799
800 #[rstest]
801 fn test_convert_and_sort_reverse_order(
802 test_instrument_id: InstrumentId,
803 test_chain: Arc<nautilus_model::defi::Chain>,
804 test_dex: Arc<Dex>,
805 ) {
806 let events = vec![
807 DefiData::PoolSwap(create_test_swap(
808 test_instrument_id,
809 test_chain.clone(),
810 test_dex.clone(),
811 100,
812 2,
813 5,
814 )),
815 DefiData::PoolSwap(create_test_swap(
816 test_instrument_id,
817 test_chain.clone(),
818 test_dex.clone(),
819 100,
820 1,
821 3,
822 )),
823 DefiData::PoolSwap(create_test_swap(
824 test_instrument_id,
825 test_chain,
826 test_dex,
827 100,
828 0,
829 1,
830 )),
831 ];
832 let sorted = convert_and_sort_buffered_events(events);
833 assert_eq!(sorted.len(), 3);
834 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
835 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
836 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
837 }
838
839 #[rstest]
840 fn test_convert_and_sort_mixed_blocks(
841 test_instrument_id: InstrumentId,
842 test_chain: Arc<nautilus_model::defi::Chain>,
843 test_dex: Arc<Dex>,
844 ) {
845 let events = vec![
846 DefiData::PoolSwap(create_test_swap(
847 test_instrument_id,
848 test_chain.clone(),
849 test_dex.clone(),
850 102,
851 0,
852 0,
853 )),
854 DefiData::PoolSwap(create_test_swap(
855 test_instrument_id,
856 test_chain.clone(),
857 test_dex.clone(),
858 100,
859 5,
860 2,
861 )),
862 DefiData::PoolSwap(create_test_swap(
863 test_instrument_id,
864 test_chain,
865 test_dex,
866 101,
867 3,
868 1,
869 )),
870 ];
871 let sorted = convert_and_sort_buffered_events(events);
872 assert_eq!(sorted.len(), 3);
873 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
874 assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
875 assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
876 }
877
878 #[rstest]
879 fn test_convert_and_sort_mixed_event_types(
880 test_instrument_id: InstrumentId,
881 test_chain: Arc<nautilus_model::defi::Chain>,
882 test_dex: Arc<Dex>,
883 ) {
884 let events = vec![
885 DefiData::PoolSwap(create_test_swap(
886 test_instrument_id,
887 test_chain.clone(),
888 test_dex.clone(),
889 100,
890 2,
891 0,
892 )),
893 DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
894 test_instrument_id,
895 test_chain.clone(),
896 test_dex.clone(),
897 100,
898 0,
899 0,
900 )),
901 DefiData::PoolFeeCollect(create_test_fee_collect(
902 test_instrument_id,
903 test_chain.clone(),
904 test_dex.clone(),
905 100,
906 1,
907 0,
908 )),
909 DefiData::PoolFlash(create_test_flash(
910 test_instrument_id,
911 test_chain,
912 test_dex,
913 100,
914 3,
915 0,
916 )),
917 ];
918 let sorted = convert_and_sort_buffered_events(events);
919 assert_eq!(sorted.len(), 4);
920 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
921 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
922 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
923 assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
924 }
925
926 #[rstest]
927 fn test_convert_and_sort_same_block_and_tx_different_log_index(
928 test_instrument_id: InstrumentId,
929 test_chain: Arc<nautilus_model::defi::Chain>,
930 test_dex: Arc<Dex>,
931 ) {
932 let events = vec![
933 DefiData::PoolSwap(create_test_swap(
934 test_instrument_id,
935 test_chain.clone(),
936 test_dex.clone(),
937 100,
938 5,
939 10,
940 )),
941 DefiData::PoolSwap(create_test_swap(
942 test_instrument_id,
943 test_chain.clone(),
944 test_dex.clone(),
945 100,
946 5,
947 5,
948 )),
949 DefiData::PoolSwap(create_test_swap(
950 test_instrument_id,
951 test_chain,
952 test_dex,
953 100,
954 5,
955 1,
956 )),
957 ];
958 let sorted = convert_and_sort_buffered_events(events);
959 assert_eq!(sorted.len(), 3);
960 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
961 assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
962 assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
963 }
964}