Skip to main content

nautilus_data/engine/
pool.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler that maintains the `Pool` state stored in the global [`Cache`].
17//!
18//! The handler is functionally equivalent to `BookUpdater` but for DeFi liquidity
19//! pools. Whenever a [`PoolSwap`] or [`PoolLiquidityUpdate`] is published on the
20//! message bus the handler looks up the corresponding `Pool` instance in the
21//! cache and applies the change in-place (for now we only update the `ts_init`
22//! timestamp so that consumers can tell the pool has been touched).
23
24use std::{cell::RefCell, rc::Rc};
25
26use nautilus_common::{cache::Cache, msgbus::Handler};
27use nautilus_model::{
28    defi::{PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap},
29    identifiers::InstrumentId,
30};
31use ustr::Ustr;
32
33/// Handles [`PoolSwap`]s and [`PoolLiquidityUpdate`]s for a single AMM pool.
34#[derive(Debug)]
35pub struct PoolUpdater {
36    id: Ustr,
37    instrument_id: InstrumentId,
38    cache: Rc<RefCell<Cache>>,
39}
40
41impl PoolUpdater {
42    /// Creates a new [`PoolUpdater`] bound to the given `instrument_id` and `cache`.
43    #[must_use]
44    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
45        Self {
46            id: Ustr::from(&format!("{}-{}", stringify!(PoolUpdater), instrument_id)),
47            instrument_id: *instrument_id,
48            cache,
49        }
50    }
51
52    /// Returns the handler ID.
53    #[must_use]
54    pub fn id(&self) -> Ustr {
55        self.id
56    }
57
58    /// Handles a pool swap event.
59    pub fn handle_pool_swap(&self, swap: &PoolSwap) {
60        if let Some(pool_profiler) = self
61            .cache
62            .borrow_mut()
63            .pool_profiler_mut(&self.instrument_id)
64            && let Err(e) = pool_profiler.process_swap(swap)
65        {
66            log::error!("Failed to process pool swap: {e}");
67        }
68    }
69
70    /// Handles a pool liquidity update event.
71    ///
72    /// # Panics
73    ///
74    /// Panics if `update.kind` is not `Mint` or `Burn`.
75    pub fn handle_pool_liquidity_update(&self, update: &PoolLiquidityUpdate) {
76        if let Some(pool_profiler) = self
77            .cache
78            .borrow_mut()
79            .pool_profiler_mut(&self.instrument_id)
80            && let Err(e) = match update.kind {
81                PoolLiquidityUpdateType::Mint => pool_profiler.process_mint(update),
82                PoolLiquidityUpdateType::Burn => pool_profiler.process_burn(update),
83                _ => panic!("Liquidity update operation {} not implemented", update.kind),
84            }
85        {
86            log::error!("Failed to process pool liquidity update: {e}");
87        }
88    }
89
90    /// Handles a pool fee collect event.
91    pub fn handle_pool_fee_collect(&self, event: &PoolFeeCollect) {
92        if let Some(pool_profiler) = self
93            .cache
94            .borrow_mut()
95            .pool_profiler_mut(&self.instrument_id)
96            && let Err(e) = pool_profiler.process_collect(event)
97        {
98            log::error!("Failed to process pool fee collect: {e}");
99        }
100    }
101
102    /// Handles a pool flash event.
103    pub fn handle_pool_flash(&self, event: &PoolFlash) {
104        if let Some(pool_profiler) = self
105            .cache
106            .borrow_mut()
107            .pool_profiler_mut(&self.instrument_id)
108            && let Err(e) = pool_profiler.process_flash(event)
109        {
110            log::error!("Failed to process pool flash: {e}");
111        }
112    }
113}
114
115// -- Typed handler wrappers -----------------------------------------------------
116
117/// Handler for pool swap events that delegates to a [`PoolUpdater`].
118#[derive(Debug)]
119pub struct PoolSwapHandler {
120    id: Ustr,
121    updater: Rc<PoolUpdater>,
122}
123
124impl PoolSwapHandler {
125    /// Creates a new swap handler delegating to the given updater.
126    #[must_use]
127    pub fn new(updater: Rc<PoolUpdater>) -> Self {
128        Self {
129            id: Ustr::from(&format!("PoolSwapHandler-{}", updater.id())),
130            updater,
131        }
132    }
133}
134
135impl Handler<PoolSwap> for PoolSwapHandler {
136    fn id(&self) -> Ustr {
137        self.id
138    }
139
140    fn handle(&self, msg: &PoolSwap) {
141        self.updater.handle_pool_swap(msg);
142    }
143}
144
145/// Handler for pool liquidity update events that delegates to a [`PoolUpdater`].
146#[derive(Debug)]
147pub struct PoolLiquidityHandler {
148    id: Ustr,
149    updater: Rc<PoolUpdater>,
150}
151
152impl PoolLiquidityHandler {
153    /// Creates a new liquidity handler delegating to the given updater.
154    #[must_use]
155    pub fn new(updater: Rc<PoolUpdater>) -> Self {
156        Self {
157            id: Ustr::from(&format!("PoolLiquidityHandler-{}", updater.id())),
158            updater,
159        }
160    }
161}
162
163impl Handler<PoolLiquidityUpdate> for PoolLiquidityHandler {
164    fn id(&self) -> Ustr {
165        self.id
166    }
167
168    fn handle(&self, msg: &PoolLiquidityUpdate) {
169        self.updater.handle_pool_liquidity_update(msg);
170    }
171}
172
173/// Handler for pool fee collect events that delegates to a [`PoolUpdater`].
174#[derive(Debug)]
175pub struct PoolCollectHandler {
176    id: Ustr,
177    updater: Rc<PoolUpdater>,
178}
179
180impl PoolCollectHandler {
181    /// Creates a new collect handler delegating to the given updater.
182    #[must_use]
183    pub fn new(updater: Rc<PoolUpdater>) -> Self {
184        Self {
185            id: Ustr::from(&format!("PoolCollectHandler-{}", updater.id())),
186            updater,
187        }
188    }
189}
190
191impl Handler<PoolFeeCollect> for PoolCollectHandler {
192    fn id(&self) -> Ustr {
193        self.id
194    }
195
196    fn handle(&self, msg: &PoolFeeCollect) {
197        self.updater.handle_pool_fee_collect(msg);
198    }
199}
200
201/// Handler for pool flash events that delegates to a [`PoolUpdater`].
202#[derive(Debug)]
203pub struct PoolFlashHandler {
204    id: Ustr,
205    updater: Rc<PoolUpdater>,
206}
207
208impl PoolFlashHandler {
209    /// Creates a new flash handler delegating to the given updater.
210    #[must_use]
211    pub fn new(updater: Rc<PoolUpdater>) -> Self {
212        Self {
213            id: Ustr::from(&format!("PoolFlashHandler-{}", updater.id())),
214            updater,
215        }
216    }
217}
218
219impl Handler<PoolFlash> for PoolFlashHandler {
220    fn id(&self) -> Ustr {
221        self.id
222    }
223
224    fn handle(&self, msg: &PoolFlash) {
225        self.updater.handle_pool_flash(msg);
226    }
227}