nautilus_data/defi/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data client functionality.
17//!
18//! This module provides DeFi subscription and request helper methods
19//! for the `DataClientAdapter`. All code in this module requires the `defi` feature flag.
20
21use nautilus_common::{
22    clients::log_command_error,
23    messages::defi::{
24        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25        SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26        SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27        UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28        UnsubscribePoolSwaps,
29    },
30};
31
32use crate::client::DataClientAdapter;
33
34impl DataClientAdapter {
35    #[inline]
36    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
37        if let Err(e) = match cmd {
38            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
39            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
40            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
41            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
42                self.subscribe_pool_liquidity_updates(cmd)
43            }
44            DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
45            DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
46        } {
47            log_command_error(&cmd, &e);
48        }
49    }
50
51    #[inline]
52    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
53        if let Err(e) = match cmd {
54            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
55            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
56            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
57            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
58                self.unsubscribe_pool_liquidity_updates(cmd)
59            }
60            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
61            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
62        } {
63            log_command_error(&cmd, &e);
64        }
65    }
66
67    /// Executes a DeFi data request command by dispatching to the appropriate handler.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the underlying client request fails.
72    #[inline]
73    pub fn execute_defi_request(&self, cmd: &DefiRequestCommand) -> anyhow::Result<()> {
74        match cmd {
75            DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
76        }
77    }
78
79    /// Subscribes to block events for the specified blockchain.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the underlying client subscribe operation fails.
84    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
85        if !self.subscriptions_blocks.contains(&cmd.chain) {
86            self.subscriptions_blocks.insert(cmd.chain);
87            self.client.subscribe_blocks(cmd)?;
88        }
89        Ok(())
90    }
91
92    /// Unsubscribes from block events for the specified blockchain.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the underlying client unsubscribe operation fails.
97    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
98        if self.subscriptions_blocks.contains(&cmd.chain) {
99            self.subscriptions_blocks.remove(&cmd.chain);
100            self.client.unsubscribe_blocks(cmd)?;
101        }
102        Ok(())
103    }
104
105    /// Subscribes to pool definition updates for the specified AMM pool.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the underlying client subscribe operation fails.
110    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
111        if !self.subscriptions_pools.contains(&cmd.instrument_id) {
112            self.subscriptions_pools.insert(cmd.instrument_id);
113            self.client.subscribe_pool(cmd)?;
114        }
115        Ok(())
116    }
117
118    /// Subscribes to pool swap events for the specified AMM pool.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the underlying client subscribe operation fails.
123    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
124        if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
125            self.subscriptions_pool_swaps.insert(cmd.instrument_id);
126            self.client.subscribe_pool_swaps(cmd)?;
127        }
128        Ok(())
129    }
130
131    /// Subscribes to pool liquidity update events for the specified AMM pool.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the underlying client subscribe operation fails.
136    fn subscribe_pool_liquidity_updates(
137        &mut self,
138        cmd: &SubscribePoolLiquidityUpdates,
139    ) -> anyhow::Result<()> {
140        if !self
141            .subscriptions_pool_liquidity_updates
142            .contains(&cmd.instrument_id)
143        {
144            self.subscriptions_pool_liquidity_updates
145                .insert(cmd.instrument_id);
146            self.client.subscribe_pool_liquidity_updates(cmd)?;
147        }
148        Ok(())
149    }
150
151    /// Subscribes to pool fee collect events for the specified AMM pool.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the underlying client subscribe operation fails.
156    fn subscribe_pool_fee_collects(
157        &mut self,
158        cmd: &SubscribePoolFeeCollects,
159    ) -> anyhow::Result<()> {
160        if !self
161            .subscriptions_pool_fee_collects
162            .contains(&cmd.instrument_id)
163        {
164            self.subscriptions_pool_fee_collects
165                .insert(cmd.instrument_id);
166            self.client.subscribe_pool_fee_collects(cmd)?;
167        }
168        Ok(())
169    }
170
171    /// Subscribes to pool flash loan events for the specified AMM pool.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the underlying client subscribe operation fails.
176    fn subscribe_pool_flash_events(
177        &mut self,
178        cmd: &SubscribePoolFlashEvents,
179    ) -> anyhow::Result<()> {
180        if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
181            self.subscriptions_pool_flash.insert(cmd.instrument_id);
182            self.client.subscribe_pool_flash_events(cmd)?;
183        }
184        Ok(())
185    }
186
187    /// Unsubscribes from pool definition updates for the specified AMM pool.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the underlying client unsubscribe operation fails.
192    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
193        if self.subscriptions_pools.contains(&cmd.instrument_id) {
194            self.subscriptions_pools.remove(&cmd.instrument_id);
195            self.client.unsubscribe_pool(cmd)?;
196        }
197        Ok(())
198    }
199
200    /// Unsubscribes from swap events for the specified AMM pool.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the underlying client unsubscribe operation fails.
205    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
206        if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
207            self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
208            self.client.unsubscribe_pool_swaps(cmd)?;
209        }
210        Ok(())
211    }
212
213    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the underlying client unsubscribe operation fails.
218    fn unsubscribe_pool_liquidity_updates(
219        &mut self,
220        cmd: &UnsubscribePoolLiquidityUpdates,
221    ) -> anyhow::Result<()> {
222        if self
223            .subscriptions_pool_liquidity_updates
224            .contains(&cmd.instrument_id)
225        {
226            self.subscriptions_pool_liquidity_updates
227                .remove(&cmd.instrument_id);
228            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
229        }
230        Ok(())
231    }
232
233    /// Unsubscribes from pool fee collect events for the specified AMM pool.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the underlying client unsubscribe operation fails.
238    fn unsubscribe_pool_fee_collects(
239        &mut self,
240        cmd: &UnsubscribePoolFeeCollects,
241    ) -> anyhow::Result<()> {
242        if self
243            .subscriptions_pool_fee_collects
244            .contains(&cmd.instrument_id)
245        {
246            self.subscriptions_pool_fee_collects
247                .remove(&cmd.instrument_id);
248            self.client.unsubscribe_pool_fee_collects(cmd)?;
249        }
250        Ok(())
251    }
252
253    /// Unsubscribes from pool flash loan events for the specified AMM pool.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the underlying client unsubscribe operation fails.
258    fn unsubscribe_pool_flash_events(
259        &mut self,
260        cmd: &UnsubscribePoolFlashEvents,
261    ) -> anyhow::Result<()> {
262        if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
263            self.subscriptions_pool_flash.remove(&cmd.instrument_id);
264            self.client.unsubscribe_pool_flash_events(cmd)?;
265        }
266        Ok(())
267    }
268
269    /// Sends a pool snapshot request for a given AMM pool.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the client fails to process the pool snapshot request.
274    pub fn request_pool_snapshot(&self, req: &RequestPoolSnapshot) -> anyhow::Result<()> {
275        self.client.request_pool_snapshot(req)
276    }
277}