nautilus_data/engine/
pool.rs1use std::{any::Any, cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::handler::MessageHandler};
27use nautilus_model::{
28 defi::{PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap},
29 identifiers::InstrumentId,
30};
31use ustr::Ustr;
32
33#[derive(Debug)]
35pub struct PoolUpdater {
36 id: Ustr,
37 instrument_id: InstrumentId,
38 cache: Rc<RefCell<Cache>>,
39}
40
41impl PoolUpdater {
42 #[must_use]
44 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
45 Self {
46 id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
47 instrument_id: *instrument_id,
48 cache,
49 }
50 }
51
52 fn handle_pool_swap(&self, swap: &PoolSwap) {
53 if let Some(pool_profiler) = self
54 .cache
55 .borrow_mut()
56 .pool_profiler_mut(&self.instrument_id)
57 && let Err(e) = pool_profiler.process_swap(swap)
58 {
59 log::error!("Failed to process pool swap: {e}");
60 }
61 }
62
63 fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
64 if let Some(pool_profiler) = self
65 .cache
66 .borrow_mut()
67 .pool_profiler_mut(&self.instrument_id)
68 && let Err(e) = match update.kind {
69 PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
70 PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
71 _ => panic!("Liquidity update operation {} not implemented", update.kind),
72 }
73 {
74 log::error!("Failed to process pool liquidity update: {e}");
75 }
76 }
77
78 fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
79 if let Some(pool_profiler) = self
80 .cache
81 .borrow_mut()
82 .pool_profiler_mut(&self.instrument_id)
83 && let Err(e) = pool_profiler.process_collect(event)
84 {
85 log::error!("Failed to process pool fee collect: {e}");
86 }
87 }
88
89 fn handle_pool_flash(&self, event: &PoolFlash) {
90 if let Some(pool_profiler) = self
91 .cache
92 .borrow_mut()
93 .pool_profiler_mut(&self.instrument_id)
94 && let Err(e) = pool_profiler.process_flash(event)
95 {
96 log::error!("Failed to process pool flash: {e}");
97 }
98 }
99}
100
101impl MessageHandler for PoolUpdater {
102 fn id(&self) -> Ustr {
103 self.id
104 }
105
106 fn handle(&self, message: &dyn Any) {
107 if let Some(swap) = message.downcast_ref::<PoolSwap>() {
108 self.handle_pool_swap(swap);
109 return;
110 }
111
112 if let Some(update) = message.downcast_ref::<PoolLiquidityUpdate>() {
113 self.handle_pool_liquidity_update(update);
114 return;
115 }
116
117 if let Some(update) = message.downcast_ref::<PoolFeeCollect>() {
118 self.handle_pool_fee_collect(update);
119 return;
120 }
121
122 if let Some(flash) = message.downcast_ref::<PoolFlash>() {
123 self.handle_pool_flash(flash);
124 }
125 }
126
127 fn as_any(&self) -> &dyn Any {
128 self
129 }
130}