1use std::collections::HashMap;
19
20use alloy_primitives::{Address, I256, U160, U256};
21
22use crate::defi::{
23 PoolLiquidityUpdate, PoolSwap, SharedPool,
24 data::{
25 DexPoolData, PoolFeeCollect, PoolLiquidityUpdateType, block::BlockPosition,
26 flash::PoolFlash,
27 },
28 pool_analysis::{
29 position::PoolPosition,
30 snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
31 swap_math::compute_swap_step,
32 },
33 tick_map::{
34 TickMap,
35 full_math::{FullMath, Q128},
36 liquidity_math::liquidity_math_add,
37 sqrt_price_math::{get_amount0_delta, get_amount1_delta, get_amounts_for_liquidity},
38 tick::PoolTick,
39 tick_math::{
40 MAX_SQRT_RATIO, MIN_SQRT_RATIO, get_sqrt_ratio_at_tick, get_tick_at_sqrt_ratio,
41 },
42 },
43};
44
45#[derive(Debug, Clone)]
63#[cfg_attr(
64 feature = "python",
65 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
66)]
67pub struct PoolProfiler {
68 pub pool: SharedPool,
70 positions: HashMap<String, PoolPosition>,
72 pub tick_map: TickMap,
74 pub state: PoolState,
76 pub analytics: PoolAnalytics,
78 last_processed_event: Option<BlockPosition>,
80 pub is_initialized: bool,
82}
83
84impl PoolProfiler {
85 #[must_use]
91 pub fn new(pool: SharedPool) -> Self {
92 let tick_spacing = pool.tick_spacing.expect("Pool tick spacing must be set");
93 Self {
94 pool,
95 positions: HashMap::new(),
96 tick_map: TickMap::new(tick_spacing),
97 state: PoolState::default(),
98 analytics: PoolAnalytics::default(),
99 last_processed_event: None,
100 is_initialized: false,
101 }
102 }
103
104 pub fn initialize(&mut self, price_sqrt_ratio_x96: U160) {
112 if self.is_initialized {
113 panic!("Pool already initialized");
114 }
115
116 let calculated_tick = get_tick_at_sqrt_ratio(price_sqrt_ratio_x96);
117 if let Some(initial_tick) = self.pool.initial_tick {
118 assert_eq!(
119 initial_tick, calculated_tick,
120 "Calculated tick does not match pool initial tick"
121 );
122 }
123
124 tracing::info!(
125 "Initializing pool profiler with tick {} and price sqrt ratio {}",
126 calculated_tick,
127 price_sqrt_ratio_x96
128 );
129
130 self.state.current_tick = calculated_tick;
131 self.state.price_sqrt_ratio_x96 = price_sqrt_ratio_x96;
132 self.is_initialized = true;
133 }
134
135 pub fn check_if_initialized(&self) {
141 if !self.is_initialized {
142 panic!("Pool is not initialized");
143 }
144 }
145
146 pub fn process(&mut self, event: &DexPoolData) -> anyhow::Result<()> {
159 match event {
160 DexPoolData::Swap(swap) => {
161 #[cfg(debug_assertions)]
162 let start = std::time::Instant::now();
163
164 self.process_swap(swap)?;
165
166 #[cfg(debug_assertions)]
167 {
168 self.analytics.swap_processing_time += start.elapsed();
169 }
170
171 self.analytics.total_swaps += 1;
172 self.last_processed_event = Some(BlockPosition::new(
173 swap.block,
174 swap.transaction_hash.clone(),
175 swap.transaction_index,
176 swap.log_index,
177 ))
178 }
179 DexPoolData::LiquidityUpdate(update) => match update.kind {
180 PoolLiquidityUpdateType::Mint => {
181 #[cfg(debug_assertions)]
182 let start = std::time::Instant::now();
183
184 self.process_mint(update)?;
185
186 #[cfg(debug_assertions)]
187 {
188 self.analytics.mint_processing_time += start.elapsed();
189 }
190
191 self.analytics.total_mints += 1;
192 self.last_processed_event = Some(BlockPosition::new(
193 update.block,
194 update.transaction_hash.clone(),
195 update.transaction_index,
196 update.log_index,
197 ))
198 }
199 PoolLiquidityUpdateType::Burn => {
200 #[cfg(debug_assertions)]
201 let start = std::time::Instant::now();
202
203 self.process_burn(update)?;
204
205 #[cfg(debug_assertions)]
206 {
207 self.analytics.burn_processing_time += start.elapsed();
208 }
209
210 self.analytics.total_burns += 1;
211 self.last_processed_event = Some(BlockPosition::new(
212 update.block,
213 update.transaction_hash.clone(),
214 update.transaction_index,
215 update.log_index,
216 ))
217 }
218 },
219 DexPoolData::FeeCollect(collect) => {
220 #[cfg(debug_assertions)]
221 let start = std::time::Instant::now();
222
223 self.process_collect(collect)?;
224
225 #[cfg(debug_assertions)]
226 {
227 self.analytics.collect_processing_time += start.elapsed();
228 }
229
230 self.analytics.total_fee_collects += 1;
231 self.last_processed_event = Some(BlockPosition::new(
232 collect.block,
233 collect.transaction_hash.clone(),
234 collect.transaction_index,
235 collect.log_index,
236 ))
237 }
238 DexPoolData::Flash(flash) => {
239 self.process_flash(flash)?;
240 self.analytics.total_flashes += 1;
241 self.last_processed_event = Some(BlockPosition::new(
242 flash.block,
243 flash.transaction_hash.clone(),
244 flash.transaction_index,
245 flash.log_index,
246 ))
247 }
248 }
249 Ok(())
250 }
251
252 pub fn process_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
276 self.check_if_initialized();
277 let zero_for_one = swap.amount0.is_positive();
278 let amount_specified = if zero_for_one {
279 swap.amount0
280 } else {
281 swap.amount1
282 };
283 let sqrt_price_limit_x96 = swap.sqrt_price_x96;
286 let (_, _) =
287 self.simulate_swap_through_ticks(amount_specified, zero_for_one, sqrt_price_limit_x96)?;
288
289 if swap.tick != self.state.current_tick {
291 tracing::error!(
292 "Inconsistency in swap processing: Current tick mismatch: simulated {}, event {}",
293 self.state.current_tick,
294 swap.tick
295 );
296 self.state.current_tick = swap.tick;
297 }
298 if swap.liquidity != self.tick_map.liquidity {
299 tracing::error!(
300 "Inconsistency in swap processing: Active liquidity mismatch: simulated {}, event {}",
301 self.tick_map.liquidity,
302 swap.liquidity
303 );
304 self.tick_map.liquidity = swap.liquidity;
305 }
306
307 Ok(())
308 }
309
310 pub fn execute_swap(
329 &mut self,
330 sender: Address,
331 recipient: Address,
332 block: BlockPosition,
333 zero_for_one: bool,
334 amount_specified: I256,
335 sqrt_price_limit_x96: U160,
336 ) -> anyhow::Result<PoolSwap> {
337 self.check_if_initialized();
338 let (amount0, amount1) =
339 self.simulate_swap_through_ticks(amount_specified, zero_for_one, sqrt_price_limit_x96)?;
340
341 self.analytics.total_swaps += 1;
342 let swap_event = PoolSwap::new(
343 self.pool.chain.clone(),
344 self.pool.dex.clone(),
345 self.pool.instrument_id,
346 self.pool.address,
347 block.number,
348 block.transaction_hash,
349 block.transaction_index,
350 block.log_index,
351 None,
352 sender,
353 recipient,
354 amount0,
355 amount1,
356 self.state.price_sqrt_ratio_x96,
357 self.tick_map.liquidity,
358 self.state.current_tick,
359 None,
360 None,
361 None,
362 );
363 Ok(swap_event)
364 }
365
366 pub fn simulate_swap_through_ticks(
394 &mut self,
395 amount_specified: I256,
396 zero_for_one: bool,
397 sqrt_price_limit_x96: U160,
398 ) -> anyhow::Result<(I256, I256)> {
399 let mut current_sqrt_price = self.state.price_sqrt_ratio_x96;
400 let mut current_tick = self.state.current_tick;
401 let exact_input = amount_specified.is_positive();
402 let mut amount_specified_remaining = amount_specified;
403 let mut amount_calculated = I256::ZERO;
404 let mut protocol_fee = U256::ZERO;
405 let fee_tier = self.pool.fee.expect("Pool fee should be initialized");
406 let fee_protocol = if zero_for_one {
408 self.state.fee_protocol % 16
410 } else {
411 self.state.fee_protocol >> 4
413 };
414
415 let mut current_fee_growth_global = if zero_for_one {
417 self.state.fee_growth_global_0
418 } else {
419 self.state.fee_growth_global_1
420 };
421
422 while amount_specified_remaining != I256::ZERO && sqrt_price_limit_x96 != current_sqrt_price
424 {
425 let sqrt_price_start_x96 = current_sqrt_price;
426
427 let (mut tick_next, initialized) = self
428 .tick_map
429 .next_initialized_tick(current_tick, zero_for_one);
430
431 tick_next = tick_next.clamp(PoolTick::MIN_TICK, PoolTick::MAX_TICK);
433
434 let sqrt_price_next = get_sqrt_ratio_at_tick(tick_next);
436
437 let sqrt_price_target = if (zero_for_one && sqrt_price_next < sqrt_price_limit_x96)
439 || (!zero_for_one && sqrt_price_next > sqrt_price_limit_x96)
440 {
441 sqrt_price_limit_x96
442 } else {
443 sqrt_price_next
444 };
445 let swap_step_result = compute_swap_step(
446 current_sqrt_price,
447 sqrt_price_target,
448 self.get_active_liquidity(),
449 amount_specified_remaining,
450 fee_tier,
451 )?;
452
453 current_sqrt_price = swap_step_result.sqrt_ratio_next_x96;
455
456 if exact_input {
458 amount_specified_remaining -= FullMath::truncate_to_i256(
460 swap_step_result.amount_in + swap_step_result.fee_amount,
461 );
462 amount_calculated -= FullMath::truncate_to_i256(swap_step_result.amount_out);
463 } else {
464 amount_specified_remaining +=
466 FullMath::truncate_to_i256(swap_step_result.amount_out);
467 amount_calculated += FullMath::truncate_to_i256(
468 swap_step_result.amount_in + swap_step_result.fee_amount,
469 );
470 }
471
472 let mut step_fee_amount = swap_step_result.fee_amount;
474 if fee_protocol > 0 {
475 let protocol_fee_delta = swap_step_result.fee_amount / U256::from(fee_protocol);
476 step_fee_amount -= protocol_fee_delta;
477 protocol_fee += protocol_fee_delta;
478 }
479
480 if self.tick_map.liquidity > 0 {
482 let fee_growth_delta =
483 FullMath::mul_div(step_fee_amount, Q128, U256::from(self.tick_map.liquidity))?;
484 current_fee_growth_global += fee_growth_delta;
485 }
486
487 if swap_step_result.sqrt_ratio_next_x96 == sqrt_price_next {
489 if initialized {
493 let liquidity_net = self.tick_map.cross_tick(
494 tick_next,
495 if zero_for_one {
496 current_fee_growth_global
497 } else {
498 self.state.fee_growth_global_0
499 },
500 if zero_for_one {
501 self.state.fee_growth_global_1
502 } else {
503 current_fee_growth_global
504 },
505 );
506
507 self.tick_map.liquidity = if zero_for_one {
511 liquidity_math_add(self.tick_map.liquidity, -liquidity_net)
512 } else {
513 liquidity_math_add(self.tick_map.liquidity, liquidity_net)
514 };
515 }
516
517 current_tick = if zero_for_one {
518 tick_next - 1
519 } else {
520 tick_next
521 };
522 } else if swap_step_result.sqrt_ratio_next_x96 != sqrt_price_start_x96 {
523 current_tick = get_tick_at_sqrt_ratio(current_sqrt_price);
526 }
527 }
528
529 if self.state.current_tick != current_tick {
531 self.state.current_tick = current_tick;
532 self.state.price_sqrt_ratio_x96 = current_sqrt_price;
533 } else {
534 self.state.price_sqrt_ratio_x96 = current_sqrt_price;
536 }
537
538 if zero_for_one {
540 self.state.fee_growth_global_0 = current_fee_growth_global;
541 self.state.protocol_fees_token0 += protocol_fee;
542 } else {
543 self.state.fee_growth_global_1 = current_fee_growth_global;
544 self.state.protocol_fees_token1 += protocol_fee;
545 }
546
547 let (amount0, amount1) = if zero_for_one == exact_input {
549 (
550 amount_specified - amount_specified_remaining,
551 amount_calculated,
552 )
553 } else {
554 (
555 amount_calculated,
556 amount_specified - amount_specified_remaining,
557 )
558 };
559
560 Ok((amount0, amount1))
561 }
562
563 pub fn swap_exact0_for_1(
572 &mut self,
573 sender: Address,
574 recipient: Address,
575 block: BlockPosition,
576 amount0_in: U256,
577 sqrt_price_limit_x96: Option<U160>,
578 ) -> anyhow::Result<PoolSwap> {
579 let amount_specified = I256::from(amount0_in);
580 let sqrt_price_limit_x96 = sqrt_price_limit_x96.unwrap_or(MIN_SQRT_RATIO + U160::from(1));
581 self.execute_swap(
582 sender,
583 recipient,
584 block,
585 true,
586 amount_specified,
587 sqrt_price_limit_x96,
588 )
589 }
590
591 pub fn swap_0_for_exact1(
600 &mut self,
601 sender: Address,
602 recipient: Address,
603 block: BlockPosition,
604 amount1_out: U256,
605 sqrt_price_limit_x96: Option<U160>,
606 ) -> anyhow::Result<PoolSwap> {
607 let amount_specified = -I256::from(amount1_out);
608 let sqrt_price_limit_x96 = sqrt_price_limit_x96.unwrap_or(MIN_SQRT_RATIO + U160::from(1));
609 self.execute_swap(
610 sender,
611 recipient,
612 block,
613 true,
614 amount_specified,
615 sqrt_price_limit_x96,
616 )
617 }
618
619 pub fn swap_exact1_for_0(
628 &mut self,
629 sender: Address,
630 recipient: Address,
631 block: BlockPosition,
632 amount1_in: U256,
633 sqrt_price_limit_x96: Option<U160>,
634 ) -> anyhow::Result<PoolSwap> {
635 let amount_specified = I256::from(amount1_in);
636 let sqrt_price_limit_x96 = sqrt_price_limit_x96.unwrap_or(MAX_SQRT_RATIO - U160::from(1));
637 self.execute_swap(
638 sender,
639 recipient,
640 block,
641 false,
642 amount_specified,
643 sqrt_price_limit_x96,
644 )
645 }
646
647 pub fn swap_1_for_exact0(
656 &mut self,
657 sender: Address,
658 recipient: Address,
659 block: BlockPosition,
660 amount0_out: U256,
661 sqrt_price_limit_x96: Option<U160>,
662 ) -> anyhow::Result<PoolSwap> {
663 let amount_specified = -I256::from(amount0_out);
664 let sqrt_price_limit_x96 = sqrt_price_limit_x96.unwrap_or(MAX_SQRT_RATIO - U160::from(1));
665 self.execute_swap(
666 sender,
667 recipient,
668 block,
669 false,
670 amount_specified,
671 sqrt_price_limit_x96,
672 )
673 }
674
675 pub fn swap_to_lower_sqrt_price(
683 &mut self,
684 sender: Address,
685 recipient: Address,
686 block: BlockPosition,
687 sqrt_price_limit_x96: U160,
688 ) -> anyhow::Result<PoolSwap> {
689 self.execute_swap(
690 sender,
691 recipient,
692 block,
693 true,
694 I256::MAX,
695 sqrt_price_limit_x96,
696 )
697 }
698
699 pub fn swap_to_higher_sqrt_price(
707 &mut self,
708 sender: Address,
709 recipient: Address,
710 block: BlockPosition,
711 sqrt_price_limit_x96: U160,
712 ) -> anyhow::Result<PoolSwap> {
713 self.execute_swap(
714 sender,
715 recipient,
716 block,
717 false,
718 I256::MAX,
719 sqrt_price_limit_x96,
720 )
721 }
722
723 pub fn process_mint(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
735 self.check_if_initialized();
736 self.validate_ticks(update.tick_lower, update.tick_upper)?;
737 self.add_liquidity(
738 &update.owner,
739 update.tick_lower,
740 update.tick_upper,
741 update.position_liquidity,
742 update.amount0,
743 update.amount1,
744 )?;
745 Ok(())
746 }
747
748 fn add_liquidity(
753 &mut self,
754 owner: &Address,
755 tick_lower: i32,
756 tick_upper: i32,
757 liquidity: u128,
758 amount0: U256,
759 amount1: U256,
760 ) -> anyhow::Result<()> {
761 self.update_position(
762 owner,
763 tick_lower,
764 tick_upper,
765 liquidity as i128,
766 amount0,
767 amount1,
768 )?;
769
770 self.analytics.total_amount0_deposited += amount0;
772 self.analytics.total_amount1_deposited += amount1;
773
774 Ok(())
775 }
776
777 pub fn execute_mint(
793 &mut self,
794 recipient: Address,
795 block: BlockPosition,
796 tick_lower: i32,
797 tick_upper: i32,
798 liquidity: u128,
799 ) -> anyhow::Result<PoolLiquidityUpdate> {
800 self.check_if_initialized();
801 self.validate_ticks(tick_lower, tick_upper)?;
802 let (amount0, amount1) = get_amounts_for_liquidity(
803 self.state.price_sqrt_ratio_x96,
804 tick_lower,
805 tick_upper,
806 liquidity,
807 true,
808 );
809 self.add_liquidity(
810 &recipient, tick_lower, tick_upper, liquidity, amount0, amount1,
811 )?;
812
813 self.analytics.total_mints += 1;
814 let event = PoolLiquidityUpdate::new(
815 self.pool.chain.clone(),
816 self.pool.dex.clone(),
817 self.pool.instrument_id,
818 self.pool.address,
819 PoolLiquidityUpdateType::Mint,
820 block.number,
821 block.transaction_hash,
822 block.transaction_index,
823 block.log_index,
824 None,
825 recipient,
826 liquidity,
827 amount0,
828 amount1,
829 tick_lower,
830 tick_upper,
831 None,
832 );
833
834 Ok(event)
835 }
836
837 pub fn process_burn(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
849 self.check_if_initialized();
850 self.validate_ticks(update.tick_lower, update.tick_upper)?;
851
852 self.update_position(
854 &update.owner,
855 update.tick_lower,
856 update.tick_upper,
857 -(update.position_liquidity as i128),
858 update.amount0,
859 update.amount1,
860 )?;
861
862 Ok(())
863 }
864
865 pub fn execute_burn(
882 &mut self,
883 recipient: Address,
884 block: BlockPosition,
885 tick_lower: i32,
886 tick_upper: i32,
887 liquidity: u128,
888 ) -> anyhow::Result<PoolLiquidityUpdate> {
889 self.check_if_initialized();
890 self.validate_ticks(tick_lower, tick_upper)?;
891 let (amount0, amount1) = get_amounts_for_liquidity(
892 self.state.price_sqrt_ratio_x96,
893 tick_lower,
894 tick_upper,
895 liquidity,
896 false,
897 );
898
899 self.update_position(
901 &recipient,
902 tick_lower,
903 tick_upper,
904 -(liquidity as i128),
905 amount0,
906 amount1,
907 )?;
908
909 self.analytics.total_burns += 1;
910 let event = PoolLiquidityUpdate::new(
911 self.pool.chain.clone(),
912 self.pool.dex.clone(),
913 self.pool.instrument_id,
914 self.pool.address,
915 PoolLiquidityUpdateType::Burn,
916 block.number,
917 block.transaction_hash,
918 block.transaction_index,
919 block.log_index,
920 None,
921 recipient,
922 liquidity,
923 amount0,
924 amount1,
925 tick_lower,
926 tick_upper,
927 None,
928 );
929
930 Ok(event)
931 }
932
933 pub fn process_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
946 self.check_if_initialized();
947
948 let position_key =
949 PoolPosition::get_position_key(&collect.owner, collect.tick_lower, collect.tick_upper);
950 if let Some(position) = self.positions.get_mut(&position_key) {
951 position.collect_fees(collect.amount0, collect.amount1);
952 }
953
954 self.analytics.total_amount0_collected += U256::from(collect.amount0);
955 self.analytics.total_amount1_collected += U256::from(collect.amount1);
956
957 Ok(())
958 }
959
960 pub fn process_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
972 self.check_if_initialized();
973 self.update_flash_state(flash.paid0, flash.paid1)
974 }
975
976 pub fn execute_flash(
991 &mut self,
992 sender: Address,
993 recipient: Address,
994 block: BlockPosition,
995 amount0: U256,
996 amount1: U256,
997 ) -> anyhow::Result<PoolFlash> {
998 self.check_if_initialized();
999 let fee_tier = self.pool.fee.expect("Pool fee should be initialized");
1000
1001 let paid0 = if amount0 > U256::ZERO {
1003 FullMath::mul_div_rounding_up(amount0, U256::from(fee_tier), U256::from(1_000_000))?
1004 } else {
1005 U256::ZERO
1006 };
1007
1008 let paid1 = if amount1 > U256::ZERO {
1009 FullMath::mul_div_rounding_up(amount1, U256::from(fee_tier), U256::from(1_000_000))?
1010 } else {
1011 U256::ZERO
1012 };
1013
1014 self.update_flash_state(paid0, paid1)?;
1015 self.analytics.total_flashes += 1;
1016
1017 let flash_event = PoolFlash::new(
1018 self.pool.chain.clone(),
1019 self.pool.dex.clone(),
1020 self.pool.instrument_id,
1021 self.pool.address,
1022 block.number,
1023 block.transaction_hash,
1024 block.transaction_index,
1025 block.log_index,
1026 None,
1027 sender,
1028 recipient,
1029 amount0,
1030 amount1,
1031 paid0,
1032 paid1,
1033 );
1034
1035 Ok(flash_event)
1036 }
1037
1038 fn update_flash_state(&mut self, paid0: U256, paid1: U256) -> anyhow::Result<()> {
1046 let liquidity = self.tick_map.liquidity;
1047 if liquidity == 0 {
1048 anyhow::bail!("No liquidity")
1049 }
1050
1051 let fee_protocol_0 = self.state.fee_protocol % 16;
1052 let fee_protocol_1 = self.state.fee_protocol >> 4;
1053
1054 if paid0 > U256::ZERO {
1056 let protocol_fee_0 = if fee_protocol_0 > 0 {
1057 paid0 / U256::from(fee_protocol_0)
1058 } else {
1059 U256::ZERO
1060 };
1061
1062 if protocol_fee_0 > U256::ZERO {
1063 self.state.protocol_fees_token0 += protocol_fee_0;
1064 }
1065
1066 let lp_fee_0 = paid0 - protocol_fee_0;
1067 let delta = FullMath::mul_div(lp_fee_0, Q128, U256::from(liquidity))?;
1068 self.state.fee_growth_global_0 += delta;
1069 }
1070
1071 if paid1 > U256::ZERO {
1073 let protocol_fee_1 = if fee_protocol_1 > 0 {
1074 paid1 / U256::from(fee_protocol_1)
1075 } else {
1076 U256::ZERO
1077 };
1078
1079 if protocol_fee_1 > U256::ZERO {
1080 self.state.protocol_fees_token1 += protocol_fee_1;
1081 }
1082
1083 let lp_fee_1 = paid1 - protocol_fee_1;
1084 let delta = FullMath::mul_div(lp_fee_1, Q128, U256::from(liquidity))?;
1085 self.state.fee_growth_global_1 += delta;
1086 }
1087
1088 Ok(())
1089 }
1090
1091 fn update_position(
1096 &mut self,
1097 owner: &Address,
1098 tick_lower: i32,
1099 tick_upper: i32,
1100 liquidity_delta: i128,
1101 amount0: U256,
1102 amount1: U256,
1103 ) -> anyhow::Result<()> {
1104 let current_tick = self.state.current_tick;
1105 let position_key = PoolPosition::get_position_key(owner, tick_lower, tick_upper);
1106 let position = self
1107 .positions
1108 .entry(position_key)
1109 .or_insert(PoolPosition::new(*owner, tick_lower, tick_upper, 0));
1110
1111 if liquidity_delta < 0 {
1113 let burn_amount = liquidity_delta.unsigned_abs();
1114 if position.liquidity < burn_amount {
1115 anyhow::bail!(
1116 "Position liquidity {} is less than the requested burn amount of {}",
1117 position.liquidity,
1118 burn_amount
1119 );
1120 }
1121 }
1122
1123 let flipped_lower = self.tick_map.update(
1125 tick_lower,
1126 current_tick,
1127 liquidity_delta,
1128 false,
1129 self.state.fee_growth_global_0,
1130 self.state.fee_growth_global_1,
1131 );
1132 let flipped_upper = self.tick_map.update(
1133 tick_upper,
1134 current_tick,
1135 liquidity_delta,
1136 true,
1137 self.state.fee_growth_global_0,
1138 self.state.fee_growth_global_1,
1139 );
1140
1141 let (fee_growth_inside_0, fee_growth_inside_1) = self.tick_map.get_fee_growth_inside(
1142 tick_lower,
1143 tick_upper,
1144 current_tick,
1145 self.state.fee_growth_global_0,
1146 self.state.fee_growth_global_1,
1147 );
1148 position.update_liquidity(liquidity_delta);
1149 position.update_fees(fee_growth_inside_0, fee_growth_inside_1);
1150 position.update_amounts(liquidity_delta, amount0, amount1);
1151
1152 if tick_lower <= current_tick && current_tick < tick_upper {
1154 self.tick_map.liquidity = ((self.tick_map.liquidity as i128) + liquidity_delta) as u128;
1155 }
1156
1157 if liquidity_delta < 0 && flipped_lower {
1159 self.tick_map.clear(tick_lower)
1160 }
1161 if liquidity_delta < 0 && flipped_upper {
1162 self.tick_map.clear(tick_upper)
1163 }
1164
1165 Ok(())
1166 }
1167
1168 fn validate_ticks(&self, tick_lower: i32, tick_upper: i32) -> anyhow::Result<()> {
1180 if tick_lower >= tick_upper {
1181 anyhow::bail!("Invalid tick range: {} >= {}", tick_lower, tick_upper)
1182 }
1183
1184 if tick_lower % self.pool.tick_spacing.unwrap() as i32 != 0
1185 || tick_upper % self.pool.tick_spacing.unwrap() as i32 != 0
1186 {
1187 anyhow::bail!(
1188 "Ticks {} and {} must be multiples of the tick spacing",
1189 tick_lower,
1190 tick_upper
1191 )
1192 }
1193
1194 if tick_lower < PoolTick::MIN_TICK || tick_upper > PoolTick::MAX_TICK {
1195 anyhow::bail!("Invalid tick bounds for {} and {}", tick_lower, tick_upper);
1196 }
1197 Ok(())
1198 }
1199
1200 #[must_use]
1209 pub fn get_active_liquidity(&self) -> u128 {
1210 self.tick_map.liquidity
1211 }
1212
1213 #[must_use]
1219 pub fn get_total_liquidity_from_active_positions(&self) -> u128 {
1220 self.positions
1221 .values()
1222 .filter(|position| {
1223 position.liquidity > 0
1224 && position.tick_lower <= self.state.current_tick
1225 && self.state.current_tick < position.tick_upper
1226 })
1227 .map(|position| position.liquidity)
1228 .sum()
1229 }
1230
1231 pub fn restore_from_snapshot(&mut self, snapshot: PoolSnapshot) -> anyhow::Result<()> {
1245 let liquidity = snapshot.state.liquidity;
1246
1247 self.state = snapshot.state;
1249
1250 self.analytics.total_amount0_deposited = snapshot.analytics.total_amount0_deposited;
1252 self.analytics.total_amount1_deposited = snapshot.analytics.total_amount1_deposited;
1253 self.analytics.total_amount0_collected = snapshot.analytics.total_amount0_collected;
1254 self.analytics.total_amount1_collected = snapshot.analytics.total_amount1_collected;
1255 self.analytics.total_swaps = snapshot.analytics.total_swaps;
1256 self.analytics.total_mints = snapshot.analytics.total_mints;
1257 self.analytics.total_burns = snapshot.analytics.total_burns;
1258 self.analytics.total_fee_collects = snapshot.analytics.total_fee_collects;
1259 self.analytics.total_flashes = snapshot.analytics.total_flashes;
1260
1261 self.positions.clear();
1263 for position in snapshot.positions {
1264 let key = PoolPosition::get_position_key(
1265 &position.owner,
1266 position.tick_lower,
1267 position.tick_upper,
1268 );
1269 self.positions.insert(key, position);
1270 }
1271
1272 self.tick_map = TickMap::new(
1274 self.pool
1275 .tick_spacing
1276 .expect("Pool tick spacing must be set"),
1277 );
1278 for tick in snapshot.ticks {
1279 self.tick_map.restore_tick(tick);
1280 }
1281
1282 self.tick_map.liquidity = liquidity;
1284
1285 self.last_processed_event = Some(snapshot.block_position);
1287
1288 self.is_initialized = true;
1290
1291 Ok(())
1292 }
1293
1294 pub fn get_active_tick_values(&self) -> Vec<i32> {
1299 self.tick_map
1300 .get_all_ticks()
1301 .iter()
1302 .filter(|(_, tick)| self.tick_map.is_tick_initialized(tick.value))
1303 .map(|(tick_value, _)| *tick_value)
1304 .collect()
1305 }
1306
1307 #[must_use]
1309 pub fn get_active_tick_count(&self) -> usize {
1310 self.tick_map.active_tick_count()
1311 }
1312
1313 pub fn get_tick(&self, tick: i32) -> Option<&PoolTick> {
1318 self.tick_map.get_tick(tick)
1319 }
1320
1321 pub fn get_current_tick(&self) -> i32 {
1326 self.state.current_tick
1327 }
1328
1329 pub fn get_total_tick_count(&self) -> usize {
1337 self.tick_map.total_tick_count()
1338 }
1339
1340 pub fn get_position(
1345 &self,
1346 owner: &Address,
1347 tick_lower: i32,
1348 tick_upper: i32,
1349 ) -> Option<&PoolPosition> {
1350 let position_key = PoolPosition::get_position_key(owner, tick_lower, tick_upper);
1351 self.positions.get(&position_key)
1352 }
1353
1354 pub fn get_active_positions(&self) -> Vec<&PoolPosition> {
1364 self.positions
1365 .values()
1366 .filter(|position| {
1367 let current_tick = self.get_current_tick();
1368 position.liquidity > 0
1369 && position.tick_lower <= current_tick
1370 && current_tick < position.tick_upper
1371 })
1372 .collect()
1373 }
1374
1375 pub fn get_all_positions(&self) -> Vec<&PoolPosition> {
1384 self.positions.values().collect()
1385 }
1386
1387 pub fn extract_snapshot(&self) -> PoolSnapshot {
1398 let positions: Vec<_> = self.positions.values().cloned().collect();
1399 let ticks: Vec<_> = self.tick_map.get_all_ticks().values().cloned().collect();
1400
1401 let mut state = self.state.clone();
1402 state.liquidity = self.tick_map.liquidity;
1403
1404 PoolSnapshot::new(
1405 self.pool.instrument_id,
1406 state,
1407 positions,
1408 ticks,
1409 self.analytics.clone(),
1410 self.last_processed_event
1411 .clone()
1412 .expect("No events processed yet"),
1413 )
1414 }
1415
1416 pub fn get_total_active_positions(&self) -> usize {
1421 self.positions
1422 .iter()
1423 .filter(|(_, position)| {
1424 let current_tick = self.get_current_tick();
1425 position.liquidity > 0
1426 && position.tick_lower <= current_tick
1427 && current_tick < position.tick_upper
1428 })
1429 .count()
1430 }
1431
1432 pub fn get_total_inactive_positions(&self) -> usize {
1437 self.positions.len() - self.get_total_active_positions()
1438 }
1439
1440 pub fn estimate_balance_of_token0(&self) -> U256 {
1447 let mut total_amount0 = U256::ZERO;
1448 let current_sqrt_price = self.state.price_sqrt_ratio_x96;
1449 let current_tick = self.state.current_tick;
1450 let mut total_fees_0_collected: u128 = 0;
1451
1452 for position in self.positions.values() {
1454 if position.liquidity > 0 {
1455 if position.tick_upper <= current_tick {
1456 continue;
1458 } else if position.tick_lower > current_tick {
1459 let sqrt_ratio_a = get_sqrt_ratio_at_tick(position.tick_lower);
1461 let sqrt_ratio_b = get_sqrt_ratio_at_tick(position.tick_upper);
1462 let amount0 =
1463 get_amount0_delta(sqrt_ratio_a, sqrt_ratio_b, position.liquidity, true);
1464 total_amount0 += amount0;
1465 } else {
1466 let sqrt_ratio_upper = get_sqrt_ratio_at_tick(position.tick_upper);
1468 let amount0 = get_amount0_delta(
1469 current_sqrt_price,
1470 sqrt_ratio_upper,
1471 position.liquidity,
1472 true,
1473 );
1474 total_amount0 += amount0;
1475 }
1476 }
1477
1478 total_fees_0_collected += position.total_amount0_collected;
1479 }
1480
1481 let fee_growth_0 = self.state.fee_growth_global_0;
1485 if fee_growth_0 > U256::ZERO {
1486 let active_liquidity = self.get_active_liquidity();
1489 if active_liquidity > 0 {
1490 if let Ok(total_fees_0) =
1493 FullMath::mul_div(fee_growth_0, U256::from(active_liquidity), Q128)
1494 {
1495 total_amount0 += total_fees_0;
1496 }
1497 }
1498 }
1499
1500 let total_fees_0_left = fee_growth_0 - U256::from(total_fees_0_collected);
1501
1502 total_amount0 += self.state.protocol_fees_token0;
1504
1505 total_amount0 + total_fees_0_left
1506 }
1507
1508 pub fn estimate_balance_of_token1(&self) -> U256 {
1515 let mut total_amount1 = U256::ZERO;
1516 let current_sqrt_price = self.state.price_sqrt_ratio_x96;
1517 let current_tick = self.state.current_tick;
1518 let mut total_fees_1_collected: u128 = 0;
1519
1520 for position in self.positions.values() {
1522 if position.liquidity > 0 {
1523 if position.tick_lower > current_tick {
1524 continue;
1526 } else if position.tick_upper <= current_tick {
1527 let sqrt_ratio_a = get_sqrt_ratio_at_tick(position.tick_lower);
1529 let sqrt_ratio_b = get_sqrt_ratio_at_tick(position.tick_upper);
1530 let amount1 =
1531 get_amount1_delta(sqrt_ratio_a, sqrt_ratio_b, position.liquidity, true);
1532 total_amount1 += amount1;
1533 } else {
1534 let sqrt_ratio_lower = get_sqrt_ratio_at_tick(position.tick_lower);
1536 let amount1 = get_amount1_delta(
1537 sqrt_ratio_lower,
1538 current_sqrt_price,
1539 position.liquidity,
1540 true,
1541 );
1542 total_amount1 += amount1;
1543 }
1544 }
1545
1546 total_fees_1_collected += position.total_amount1_collected;
1548 }
1549
1550 let fee_growth_1 = self.state.fee_growth_global_1;
1552 if fee_growth_1 > U256::ZERO {
1553 let active_liquidity = self.get_active_liquidity();
1554 if active_liquidity > 0 {
1555 if let Ok(total_fees_1) =
1557 FullMath::mul_div(fee_growth_1, U256::from(active_liquidity), Q128)
1558 {
1559 total_amount1 += total_fees_1;
1560 }
1561 }
1562 }
1563
1564 let total_fees_1_left = fee_growth_1 - U256::from(total_fees_1_collected);
1565
1566 total_amount1 += self.state.protocol_fees_token1;
1568
1569 total_amount1 + total_fees_1_left
1570 }
1571
1572 pub fn set_fee_growth_global(&mut self, fee_growth_global_0: U256, fee_growth_global_1: U256) {
1581 self.state.fee_growth_global_0 = fee_growth_global_0;
1582 self.state.fee_growth_global_1 = fee_growth_global_1;
1583 }
1584
1585 #[cfg(debug_assertions)]
1587 pub fn get_total_processing_time(&self) -> std::time::Duration {
1588 self.analytics.swap_processing_time
1589 + self.analytics.mint_processing_time
1590 + self.analytics.burn_processing_time
1591 + self.analytics.collect_processing_time
1592 }
1593
1594 pub fn get_total_events(&self) -> u64 {
1596 self.analytics.total_swaps
1597 + self.analytics.total_mints
1598 + self.analytics.total_burns
1599 + self.analytics.total_fee_collects
1600 + self.analytics.total_flashes
1601 }
1602
1603 #[cfg(debug_assertions)]
1605 pub fn log_performance_report(
1606 &self,
1607 total_time: std::time::Duration,
1608 streaming_time: std::time::Duration,
1609 ) {
1610 use thousands::Separable;
1611
1612 let processing_time = self.get_total_processing_time();
1613 let total_events = self.get_total_events();
1614
1615 log::info!("═══════════════════════════════════════════════════════");
1616 log::info!(" Profiling Performance Report");
1617 log::info!("═══════════════════════════════════════════════════════");
1618 log::info!("");
1619 log::info!("Total Time: {:.2}s", total_time.as_secs_f64());
1620 log::info!(
1621 "├─ Database Streaming: {:.2}s ({:.1}%)",
1622 streaming_time.as_secs_f64(),
1623 (streaming_time.as_secs_f64() / total_time.as_secs_f64()) * 100.0
1624 );
1625 log::info!(
1626 "└─ Event Processing: {:.2}s ({:.1}%)",
1627 processing_time.as_secs_f64(),
1628 (processing_time.as_secs_f64() / total_time.as_secs_f64()) * 100.0
1629 );
1630 log::info!("");
1631 log::info!(
1632 "Event Processing Breakdown: {:.2}s",
1633 processing_time.as_secs_f64()
1634 );
1635
1636 if self.analytics.total_swaps > 0 {
1637 let swap_time = self.analytics.swap_processing_time.as_secs_f64();
1638 log::info!(
1639 "├─ Swaps: {:.2}s ({:.1}%) - {} events → {} events/sec",
1640 swap_time,
1641 (swap_time / processing_time.as_secs_f64()) * 100.0,
1642 self.analytics.total_swaps.separate_with_commas(),
1643 ((self.analytics.total_swaps as f64 / swap_time) as u64).separate_with_commas()
1644 );
1645 }
1646
1647 if self.analytics.total_mints > 0 {
1648 let mint_time = self.analytics.mint_processing_time.as_secs_f64();
1649 log::info!(
1650 "├─ Mints: {:.2}s ({:.1}%) - {} events → {} events/sec",
1651 mint_time,
1652 (mint_time / processing_time.as_secs_f64()) * 100.0,
1653 self.analytics.total_mints.separate_with_commas(),
1654 ((self.analytics.total_mints as f64 / mint_time) as u64).separate_with_commas()
1655 );
1656 }
1657
1658 if self.analytics.total_burns > 0 {
1659 let burn_time = self.analytics.burn_processing_time.as_secs_f64();
1660 log::info!(
1661 "├─ Burns: {:.2}s ({:.1}%) - {} events → {} events/sec",
1662 burn_time,
1663 (burn_time / processing_time.as_secs_f64()) * 100.0,
1664 self.analytics.total_burns.separate_with_commas(),
1665 ((self.analytics.total_burns as f64 / burn_time) as u64).separate_with_commas()
1666 );
1667 }
1668
1669 if self.analytics.total_fee_collects > 0 {
1670 let collect_time = self.analytics.collect_processing_time.as_secs_f64();
1671 log::info!(
1672 "└─ Collects: {:.2}s ({:.1}%) - {} events → {} events/sec",
1673 collect_time,
1674 (collect_time / processing_time.as_secs_f64()) * 100.0,
1675 self.analytics.total_fee_collects.separate_with_commas(),
1676 ((self.analytics.total_fee_collects as f64 / collect_time) as u64)
1677 .separate_with_commas()
1678 );
1679 }
1680
1681 log::info!("");
1682 log::info!(
1683 "Overall Throughput: {} events/sec",
1684 ((total_events as f64 / total_time.as_secs_f64()) as u64).separate_with_commas()
1685 );
1686 log::info!(
1687 "Processing Throughput: {} events/sec",
1688 ((total_events as f64 / processing_time.as_secs_f64()) as u64).separate_with_commas()
1689 );
1690 log::info!("═══════════════════════════════════════════════════════");
1691 }
1692}