nautilus_data/defi/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data client functionality.
17//!
18//! This module provides DeFi subscription and request helper methods
19//! for the `DataClientAdapter`. All code in this module requires the `defi` feature flag.
20
21use std::fmt::{Debug, Display};
22
23use nautilus_common::messages::defi::{
24    DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25    SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26    SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27    UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28    UnsubscribePoolSwaps,
29};
30
31use crate::client::DataClientAdapter;
32
33impl DataClientAdapter {
34    #[inline]
35    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
36        if let Err(e) = match cmd {
37            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
38            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
39            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
40            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
41                self.subscribe_pool_liquidity_updates(cmd)
42            }
43            DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
44            DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
45        } {
46            log_command_error(&cmd, &e);
47        }
48    }
49
50    #[inline]
51    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
52        if let Err(e) = match cmd {
53            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
54            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
55            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
56            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
57                self.unsubscribe_pool_liquidity_updates(cmd)
58            }
59            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
60            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
61        } {
62            log_command_error(&cmd, &e);
63        }
64    }
65
66    /// Executes a DeFi data request command by dispatching to the appropriate handler.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the underlying client request fails.
71    #[inline]
72    pub fn execute_defi_request(&self, cmd: &DefiRequestCommand) -> anyhow::Result<()> {
73        match cmd {
74            DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
75        }
76    }
77
78    /// Subscribes to block events for the specified blockchain.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the underlying client subscribe operation fails.
83    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
84        if !self.subscriptions_blocks.contains(&cmd.chain) {
85            self.subscriptions_blocks.insert(cmd.chain);
86            self.client.subscribe_blocks(cmd)?;
87        }
88        Ok(())
89    }
90
91    /// Unsubscribes from block events for the specified blockchain.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the underlying client unsubscribe operation fails.
96    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
97        if self.subscriptions_blocks.contains(&cmd.chain) {
98            self.subscriptions_blocks.remove(&cmd.chain);
99            self.client.unsubscribe_blocks(cmd)?;
100        }
101        Ok(())
102    }
103
104    /// Subscribes to pool definition updates for the specified AMM pool.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the underlying client subscribe operation fails.
109    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
110        if !self.subscriptions_pools.contains(&cmd.instrument_id) {
111            self.subscriptions_pools.insert(cmd.instrument_id);
112            self.client.subscribe_pool(cmd)?;
113        }
114        Ok(())
115    }
116
117    /// Subscribes to pool swap events for the specified AMM pool.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the underlying client subscribe operation fails.
122    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
123        if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
124            self.subscriptions_pool_swaps.insert(cmd.instrument_id);
125            self.client.subscribe_pool_swaps(cmd)?;
126        }
127        Ok(())
128    }
129
130    /// Subscribes to pool liquidity update events for the specified AMM pool.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the underlying client subscribe operation fails.
135    fn subscribe_pool_liquidity_updates(
136        &mut self,
137        cmd: &SubscribePoolLiquidityUpdates,
138    ) -> anyhow::Result<()> {
139        if !self
140            .subscriptions_pool_liquidity_updates
141            .contains(&cmd.instrument_id)
142        {
143            self.subscriptions_pool_liquidity_updates
144                .insert(cmd.instrument_id);
145            self.client.subscribe_pool_liquidity_updates(cmd)?;
146        }
147        Ok(())
148    }
149
150    /// Subscribes to pool fee collect events for the specified AMM pool.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the underlying client subscribe operation fails.
155    fn subscribe_pool_fee_collects(
156        &mut self,
157        cmd: &SubscribePoolFeeCollects,
158    ) -> anyhow::Result<()> {
159        if !self
160            .subscriptions_pool_fee_collects
161            .contains(&cmd.instrument_id)
162        {
163            self.subscriptions_pool_fee_collects
164                .insert(cmd.instrument_id);
165            self.client.subscribe_pool_fee_collects(cmd)?;
166        }
167        Ok(())
168    }
169
170    /// Subscribes to pool flash loan events for the specified AMM pool.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the underlying client subscribe operation fails.
175    fn subscribe_pool_flash_events(
176        &mut self,
177        cmd: &SubscribePoolFlashEvents,
178    ) -> anyhow::Result<()> {
179        if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
180            self.subscriptions_pool_flash.insert(cmd.instrument_id);
181            self.client.subscribe_pool_flash_events(cmd)?;
182        }
183        Ok(())
184    }
185
186    /// Unsubscribes from pool definition updates for the specified AMM pool.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the underlying client unsubscribe operation fails.
191    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
192        if self.subscriptions_pools.contains(&cmd.instrument_id) {
193            self.subscriptions_pools.remove(&cmd.instrument_id);
194            self.client.unsubscribe_pool(cmd)?;
195        }
196        Ok(())
197    }
198
199    /// Unsubscribes from swap events for the specified AMM pool.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the underlying client unsubscribe operation fails.
204    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
205        if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
206            self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
207            self.client.unsubscribe_pool_swaps(cmd)?;
208        }
209        Ok(())
210    }
211
212    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if the underlying client unsubscribe operation fails.
217    fn unsubscribe_pool_liquidity_updates(
218        &mut self,
219        cmd: &UnsubscribePoolLiquidityUpdates,
220    ) -> anyhow::Result<()> {
221        if self
222            .subscriptions_pool_liquidity_updates
223            .contains(&cmd.instrument_id)
224        {
225            self.subscriptions_pool_liquidity_updates
226                .remove(&cmd.instrument_id);
227            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
228        }
229        Ok(())
230    }
231
232    /// Unsubscribes from pool fee collect events for the specified AMM pool.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the underlying client unsubscribe operation fails.
237    fn unsubscribe_pool_fee_collects(
238        &mut self,
239        cmd: &UnsubscribePoolFeeCollects,
240    ) -> anyhow::Result<()> {
241        if self
242            .subscriptions_pool_fee_collects
243            .contains(&cmd.instrument_id)
244        {
245            self.subscriptions_pool_fee_collects
246                .remove(&cmd.instrument_id);
247            self.client.unsubscribe_pool_fee_collects(cmd)?;
248        }
249        Ok(())
250    }
251
252    /// Unsubscribes from pool flash loan events for the specified AMM pool.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the underlying client unsubscribe operation fails.
257    fn unsubscribe_pool_flash_events(
258        &mut self,
259        cmd: &UnsubscribePoolFlashEvents,
260    ) -> anyhow::Result<()> {
261        if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
262            self.subscriptions_pool_flash.remove(&cmd.instrument_id);
263            self.client.unsubscribe_pool_flash_events(cmd)?;
264        }
265        Ok(())
266    }
267
268    /// Sends a pool snapshot request for a given AMM pool.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the client fails to process the pool snapshot request.
273    pub fn request_pool_snapshot(&self, req: &RequestPoolSnapshot) -> anyhow::Result<()> {
274        self.client.request_pool_snapshot(req)
275    }
276}
277
278#[inline(always)]
279fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
280    log::error!("Error on {cmd:?}: {e}");
281}