nautilus_data/engine/
pool.rs1use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28 defi::{PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap},
29 identifiers::InstrumentId,
30};
31use ustr::Ustr;
32
33#[derive(Debug)]
35pub struct PoolUpdater {
36 id: Ustr,
37 instrument_id: InstrumentId,
38 cache: Rc<RefCell<Cache>>,
39}
40
41impl PoolUpdater {
42 #[must_use]
44 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
45 Self {
46 id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
47 instrument_id: *instrument_id,
48 cache,
49 }
50 }
51
52 #[must_use]
54 pub fn id(&self) -> Ustr {
55 self.id
56 }
57
58 pub fn handle_pool_swap(&self, swap: &PoolSwap) {
60 if let Some(pool_profiler) = self
61 .cache
62 .borrow_mut()
63 .pool_profiler_mut(&self.instrument_id)
64 && let Err(e) = pool_profiler.process_swap(swap)
65 {
66 log::error!("Failed to process pool swap: {e}");
67 }
68 }
69
70 pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
76 if let Some(pool_profiler) = self
77 .cache
78 .borrow_mut()
79 .pool_profiler_mut(&self.instrument_id)
80 && let Err(e) = match update.kind {
81 PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
82 PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
83 _ => panic!("Liquidity update operation {} not implemented", update.kind),
84 }
85 {
86 log::error!("Failed to process pool liquidity update: {e}");
87 }
88 }
89
90 pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
92 if let Some(pool_profiler) = self
93 .cache
94 .borrow_mut()
95 .pool_profiler_mut(&self.instrument_id)
96 && let Err(e) = pool_profiler.process_collect(event)
97 {
98 log::error!("Failed to process pool fee collect: {e}");
99 }
100 }
101
102 pub fn handle_pool_flash(&self, event: &PoolFlash) {
104 if let Some(pool_profiler) = self
105 .cache
106 .borrow_mut()
107 .pool_profiler_mut(&self.instrument_id)
108 && let Err(e) = pool_profiler.process_flash(event)
109 {
110 log::error!("Failed to process pool flash: {e}");
111 }
112 }
113}
114
115#[derive(Debug)]
119pub struct PoolSwapHandler {
120 id: Ustr,
121 updater: Rc<PoolUpdater>,
122}
123
124impl PoolSwapHandler {
125 #[must_use]
127 pub fn new(updater: Rc<PoolUpdater>) -> Self {
128 Self {
129 id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
130 updater,
131 }
132 }
133}
134
135impl Handler<PoolSwap> for PoolSwapHandler {
136 fn id(&self) -> Ustr {
137 self.id
138 }
139
140 fn handle(&self, msg: &PoolSwap) {
141 self.updater.handle_pool_swap(msg);
142 }
143}
144
145#[derive(Debug)]
147pub struct PoolLiquidityHandler {
148 id: Ustr,
149 updater: Rc<PoolUpdater>,
150}
151
152impl PoolLiquidityHandler {
153 #[must_use]
155 pub fn new(updater: Rc<PoolUpdater>) -> Self {
156 Self {
157 id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
158 updater,
159 }
160 }
161}
162
163impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
164 fn id(&self) -> Ustr {
165 self.id
166 }
167
168 fn handle(&self, msg: &PoolLiquidityUpdate) {
169 self.updater.handle_pool_liquidity_update(msg);
170 }
171}
172
173#[derive(Debug)]
175pub struct PoolCollectHandler {
176 id: Ustr,
177 updater: Rc<PoolUpdater>,
178}
179
180impl PoolCollectHandler {
181 #[must_use]
183 pub fn new(updater: Rc<PoolUpdater>) -> Self {
184 Self {
185 id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
186 updater,
187 }
188 }
189}
190
191impl Handler<PoolFeeCollect> for PoolCollectHandler {
192 fn id(&self) -> Ustr {
193 self.id
194 }
195
196 fn handle(&self, msg: &PoolFeeCollect) {
197 self.updater.handle_pool_fee_collect(msg);
198 }
199}
200
201#[derive(Debug)]
203pub struct PoolFlashHandler {
204 id: Ustr,
205 updater: Rc<PoolUpdater>,
206}
207
208impl PoolFlashHandler {
209 #[must_use]
211 pub fn new(updater: Rc<PoolUpdater>) -> Self {
212 Self {
213 id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
214 updater,
215 }
216 }
217}
218
219impl Handler<PoolFlash> for PoolFlashHandler {
220 fn id(&self) -> Ustr {
221 self.id
222 }
223
224 fn handle(&self, msg: &PoolFlash) {
225 self.updater.handle_pool_flash(msg);
226 }
227}